You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/09 06:30:49 UTC

[GitHub] [flink] KurtYoung commented on a change in pull request #11892: [FLINK-17112][table] Support DESCRIBE statement in Flink SQL

KurtYoung commented on a change in pull request #11892:
URL: https://github.com/apache/flink/pull/11892#discussion_r422459277



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -929,16 +935,69 @@ private TableResult executeOperation(Operation operation) {
 					.setPrintStyle(TableResultImpl.PrintStyle.RAW_CONTENT)
 					.build();
 
+		} else if (operation instanceof DescribeTableOperation) {
+			DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
+			Optional<CatalogManager.TableLookupResult> result =
+					catalogManager.getTable(describeTableOperation.getSqlIdentifier());
+			if (result.isPresent()) {
+				return buildDescribeResult(result.get().getTable().getSchema());
+			} else {
+				throw new ValidationException(String.format(
+						"Tables or views with the identifier '%s' doesn't exist",
+						describeTableOperation.getSqlIdentifier().asSummaryString()));
+			}
 		} else {
 			throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
 		}
 	}
 
 	private TableResult buildShowResult(String[] objects) {
+		return buildResult(
+			new String[]{"result"},
+			new DataType[]{DataTypes.STRING()},
+			Arrays.stream(objects).map((c) -> new String[]{c}).toArray(String[][]::new));
+	}
+
+	private TableResult buildDescribeResult(TableSchema schema) {
+		Map<String, String> fieldToWatermark =
+				schema.getWatermarkSpecs()
+						.stream()
+						.collect(Collectors.toMap(WatermarkSpec::getRowtimeAttribute, WatermarkSpec::getWatermarkExpr));
+
+		Map<String, String> fieldToPrimaryKey = new HashMap<>();
+		schema.getPrimaryKey().ifPresent((p) -> {
+			List<String> columns = p.getColumns();
+			columns.forEach((c) -> fieldToPrimaryKey.put(c, String.format("PRI(%s)", String.join(",", columns))));
+		});
+
+		Object[][] rows =

Review comment:
       use String[][]? The fields are all strings

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
##########
@@ -985,6 +987,67 @@ class TableEnvironmentTest {
     }
   }
 
+  @Test
+  def testDescribeTableOrView(): Unit = {
+    val sourceDDL =
+      """
+        |CREATE TABLE T1(
+        |  a int not null,

Review comment:
       please add all supported types here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org