You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/12 12:41:32 UTC
[flink] branch master updated: [FLINK-17112][table] Support
DESCRIBE statement in Flink SQL
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c5c6b76 [FLINK-17112][table] Support DESCRIBE statement in Flink SQL
c5c6b76 is described below
commit c5c6b7612d0b5492ec4db8233709bf0a1093a1fe
Author: Zhenghua Gao <do...@gmail.com>
AuthorDate: Tue May 12 20:41:16 2020 +0800
[FLINK-17112][table] Support DESCRIBE statement in Flink SQL
This closes #11892
---
.../table/api/internal/TableEnvironmentImpl.java | 69 +++++++++++-
.../flink/table/api/internal/TableResultImpl.java | 22 ++--
.../table/operations/DescribeTableOperation.java | 59 +++++++++++
.../org/apache/flink/table/utils/PrintUtils.java | 15 ++-
.../operations/SqlToOperationConverter.java | 12 +++
.../table/planner/calcite/FlinkPlannerImpl.scala | 7 +-
.../flink/table/api/TableEnvironmentTest.scala | 116 ++++++++++++++++++++-
.../table/sqlexec/SqlToOperationConverter.java | 12 +++
.../flink/table/api/internal/TableEnvImpl.scala | 60 ++++++++++-
.../flink/table/calcite/FlinkPlannerImpl.scala | 7 +-
.../api/batch/BatchTableEnvironmentTest.scala | 28 +++++
11 files changed, 382 insertions(+), 25 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 23752c3..de7bd96 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogFunction;
@@ -76,6 +77,7 @@ import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.CatalogQueryOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
@@ -112,13 +114,17 @@ import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceValidation;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -156,7 +162,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
"CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, " +
"CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, " +
"SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, CREATE VIEW, DROP VIEW, SHOW VIEWS, " +
- "INSERT.";
+ "INSERT, DESCRIBE.";
/**
* Provides necessary methods for {@link ConnectTableDescriptor}.
@@ -712,7 +718,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(tableSchema)
.data(tableSink.getResultIterator())
- .setPrintStyle(TableResultImpl.PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH))
+ .setPrintStyle(TableResultImpl.PrintStyle.tableau(
+ PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN))
.build();
} catch (Exception e) {
throw new TableException("Failed to execute sql", e);
@@ -966,6 +973,17 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
.data(Collections.singletonList(Row.of(explanation)))
.setPrintStyle(TableResultImpl.PrintStyle.rawContent())
.build();
+ } else if (operation instanceof DescribeTableOperation) {
+ DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
+ Optional<CatalogManager.TableLookupResult> result =
+ catalogManager.getTable(describeTableOperation.getSqlIdentifier());
+ if (result.isPresent()) {
+ return buildDescribeResult(result.get().getTable().getSchema());
+ } else {
+ throw new ValidationException(String.format(
+ "Tables or views with the identifier '%s' doesn't exist",
+ describeTableOperation.getSqlIdentifier().asSummaryString()));
+ }
} else if (operation instanceof QueryOperation) {
return executeInternal((QueryOperation) operation);
} else {
@@ -974,10 +992,53 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
}
private TableResult buildShowResult(String[] objects) {
+ return buildResult(
+ new String[]{"result"},
+ new DataType[]{DataTypes.STRING()},
+ Arrays.stream(objects).map((c) -> new String[]{c}).toArray(String[][]::new));
+ }
+
+ private TableResult buildDescribeResult(TableSchema schema) {
+ Map<String, String> fieldToWatermark =
+ schema.getWatermarkSpecs()
+ .stream()
+ .collect(Collectors.toMap(WatermarkSpec::getRowtimeAttribute, WatermarkSpec::getWatermarkExpr));
+
+ Map<String, String> fieldToPrimaryKey = new HashMap<>();
+ schema.getPrimaryKey().ifPresent((p) -> {
+ List<String> columns = p.getColumns();
+ columns.forEach((c) -> fieldToPrimaryKey.put(c, String.format("PRI(%s)", String.join(", ", columns))));
+ });
+
+ Object[][] rows =
+ schema.getTableColumns()
+ .stream()
+ .map((c) -> {
+ LogicalType logicalType = c.getType().getLogicalType();
+ return new Object[]{
+ c.getName(),
+ StringUtils.removeEnd(logicalType.toString(), " NOT NULL"),
+ logicalType.isNullable(),
+ fieldToPrimaryKey.getOrDefault(c.getName(), null),
+ c.getExpr().orElse(null),
+ fieldToWatermark.getOrDefault(c.getName(), null)};
+ }).toArray(Object[][]::new);
+
+ return buildResult(
+ new String[]{"name", "type", "null", "key", "computed column", "watermark"},
+ new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.BOOLEAN(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()},
+ rows);
+ }
+
+ private TableResult buildResult(String[] headers, DataType[] types, Object[][] rows) {
return TableResultImpl.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
- .tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
- .data(Arrays.stream(objects).map(Row::of).collect(Collectors.toList()))
+ .tableSchema(
+ TableSchema.builder().fields(
+ headers,
+ types).build())
+ .data(Arrays.stream(rows).map(Row::of).collect(Collectors.toList()))
+ .setPrintStyle(TableResultImpl.PrintStyle.tableau(Integer.MAX_VALUE, ""))
.build();
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
index d04bb2e..5c82f5e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
@@ -92,7 +92,9 @@ class TableResultImpl implements TableResult {
Iterator<Row> it = collect();
if (printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
- PrintUtils.printAsTableauForm(getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth);
+ String nullColumn = ((TableauStyle) printStyle).getNullColumn();
+ PrintUtils.printAsTableauForm(
+ getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn);
} else if (printStyle instanceof RawContentStyle) {
while (it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString(it.next())));
@@ -114,7 +116,7 @@ class TableResultImpl implements TableResult {
private TableSchema tableSchema = null;
private ResultKind resultKind = null;
private Iterator<Row> data = null;
- private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE);
+ private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN);
private Builder() {
}
@@ -195,12 +197,13 @@ class TableResultImpl implements TableResult {
*/
public interface PrintStyle {
/**
- * Create a tableau print style with given max column width,
+ * Create a tableau print style with given max column width and null column,
* which prints the result schema and content as tableau form.
*/
- static PrintStyle tableau(int maxColumnWidth) {
+ static PrintStyle tableau(int maxColumnWidth, String nullColumn) {
Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
- return new TableauStyle(maxColumnWidth);
+ Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
+ return new TableauStyle(maxColumnWidth, nullColumn);
}
/**
@@ -217,15 +220,22 @@ class TableResultImpl implements TableResult {
* print the result schema and content as tableau form.
*/
private static final class TableauStyle implements PrintStyle {
+
private final int maxColumnWidth;
+ private final String nullColumn;
- private TableauStyle(int maxColumnWidth) {
+ private TableauStyle(int maxColumnWidth, String nullColumn) {
this.maxColumnWidth = maxColumnWidth;
+ this.nullColumn = nullColumn;
}
int getMaxColumnWidth() {
return maxColumnWidth;
}
+
+ String getNullColumn() {
+ return nullColumn;
+ }
}
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java
new file mode 100644
index 0000000..ecd8bc7
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier statement.
+ */
+public class DescribeTableOperation implements Operation {
+
+ private final ObjectIdentifier sqlIdentifier;
+ private final boolean isExtended;
+
+ public DescribeTableOperation(ObjectIdentifier sqlIdentifier, boolean isExtended) {
+ this.sqlIdentifier = sqlIdentifier;
+ this.isExtended = isExtended;
+ }
+
+ public ObjectIdentifier getSqlIdentifier() {
+ return sqlIdentifier;
+ }
+
+ public boolean isExtended() {
+ return isExtended;
+ }
+
+ @Override
+ public String asSummaryString() {
+ Map<String, Object> params = new LinkedHashMap<>();
+ params.put("identifier", sqlIdentifier);
+ params.put("isExtended", isExtended);
+ return OperationUtils.formatWithChildren(
+ "DESCRIBE",
+ params,
+ Collections.emptyList(),
+ Operation::asSummaryString);
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
index 4de488e..7916fa1 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
@@ -40,7 +40,7 @@ public class PrintUtils {
// constants for printing
public static final int MAX_COLUMN_WIDTH = 30;
- private static final String NULL_COLUMN = "(NULL)";
+ public static final String NULL_COLUMN = "(NULL)";
private static final String COLUMN_TRUNCATED_FLAG = "...";
private PrintUtils() {
@@ -65,7 +65,7 @@ public class PrintUtils {
TableSchema tableSchema,
Iterator<Row> it,
PrintWriter printWriter) {
- printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH);
+ printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH, NULL_COLUMN);
}
/**
@@ -87,14 +87,15 @@ public class PrintUtils {
TableSchema tableSchema,
Iterator<Row> it,
PrintWriter printWriter,
- int maxColumnWidth) {
+ int maxColumnWidth,
+ String nullColumn) {
List<String[]> rows = new ArrayList<>();
// fill field names first
List<TableColumn> columns = tableSchema.getTableColumns();
rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new));
while (it.hasNext()) {
- rows.add(rowToString(it.next()));
+ rows.add(rowToString(it.next(), nullColumn));
}
int[] colWidths = columnWidthsByContent(columns, rows, maxColumnWidth);
@@ -123,11 +124,15 @@ public class PrintUtils {
}
public static String[] rowToString(Row row) {
+ return rowToString(row, NULL_COLUMN);
+ }
+
+ public static String[] rowToString(Row row, String nullColumn) {
final String[] fields = new String[row.getArity()];
for (int i = 0; i < row.getArity(); i++) {
final Object field = row.getField(i);
if (field == null) {
- fields[i] = NULL_COLUMN;
+ fields[i] = nullColumn;
} else {
fields[i] = StringUtils.arrayAwareToString(field);
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 49138d1..11dece9 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -39,6 +39,7 @@ import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
import org.apache.flink.sql.parser.dml.RichSqlInsert;
+import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
import org.apache.flink.sql.parser.dql.SqlShowDatabases;
import org.apache.flink.sql.parser.dql.SqlShowFunctions;
@@ -64,6 +65,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ShowCatalogsOperation;
@@ -205,6 +207,8 @@ public class SqlToOperationConverter {
return Optional.of(converter.convertShowViews((SqlShowViews) validated));
} else if (validated instanceof SqlExplain) {
return Optional.of(converter.convertExplain((SqlExplain) validated));
+ } else if (validated instanceof SqlRichDescribeTable) {
+ return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
@@ -610,6 +614,14 @@ public class SqlToOperationConverter {
return new ExplainOperation(operation);
}
+ /** Convert DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier. */
+ private Operation convertDescribeTable(SqlRichDescribeTable sqlRichDescribeTable) {
+ UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlRichDescribeTable.fullTableName());
+ ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+ return new DescribeTableOperation(identifier, sqlRichDescribeTable.isExtended());
+ }
+
/** Fallback method for sql query. */
private Operation convertSqlQuery(SqlNode node) {
return toQueryOperation(flinkPlanner, node);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 846f536..b38fa92 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -19,9 +19,10 @@
package org.apache.flink.table.planner.calcite
import org.apache.flink.sql.parser.ExtendedSqlNode
-import org.apache.flink.sql.parser.dql.{SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews}
+import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews}
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader
+
import com.google.common.collect.ImmutableList
import org.apache.calcite.config.NullCollation
import org.apache.calcite.plan._
@@ -33,6 +34,7 @@ import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
+
import java.lang.{Boolean => JBoolean}
import java.util
import java.util.function.{Function => JFunction}
@@ -126,7 +128,8 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlShowDatabases]
|| sqlNode.isInstanceOf[SqlShowTables]
|| sqlNode.isInstanceOf[SqlShowFunctions]
- || sqlNode.isInstanceOf[SqlShowViews]) {
+ || sqlNode.isInstanceOf[SqlShowViews]
+ || sqlNode.isInstanceOf[SqlRichDescribeTable]) {
return sqlNode
}
sqlNode match {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 93177a9..a890d7c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -728,9 +728,11 @@ class TableEnvironmentTest {
val sourceDDL =
"""
|CREATE TABLE T1(
- | a int,
+ | a int not null,
| b varchar,
- | c int
+ | c int,
+ | ts AS to_timestamp(b),
+ | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
|) with (
| 'connector' = 'COLLECTION'
|)
@@ -975,6 +977,116 @@ class TableEnvironmentTest {
}
}
+ @Test
+ def testDescribeTableOrView(): Unit = {
+ val sourceDDL =
+ """
+ |CREATE TABLE T1(
+ | f0 char(10),
+ | f1 varchar(10),
+ | f2 string,
+ | f3 BOOLEAN,
+ | f4 BINARY(10),
+ | f5 VARBINARY(10),
+ | f6 BYTES,
+ | f7 DECIMAL(10, 3),
+ | f8 TINYINT,
+ | f9 SMALLINT,
+ | f10 INTEGER,
+ | f11 BIGINT,
+ | f12 FLOAT,
+ | f13 DOUBLE,
+ | f14 DATE,
+ | f15 TIME,
+ | f16 TIMESTAMP,
+ | f17 TIMESTAMP(3),
+ | f18 TIMESTAMP WITHOUT TIME ZONE,
+ | f19 TIMESTAMP(3) WITH LOCAL TIME ZONE,
+ | f20 TIMESTAMP WITH LOCAL TIME ZONE,
+ | f21 ARRAY<INT>,
+ | f22 MAP<INT, STRING>,
+ | f23 ROW<f0 INT, f1 STRING>,
+ | f24 int not null,
+ | f25 varchar not null,
+ | f26 row<f0 int not null, f1 int> not null,
+ | ts AS to_timestamp(f25),
+ | PRIMARY KEY(f24, f26) NOT ENFORCED,
+ | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
+ |) with (
+ | 'connector' = 'COLLECTION'
+ |)
+ """.stripMargin
+
+ val viewDDL =
+ """
+ |CREATE VIEW IF NOT EXISTS T2(d, e, f) AS SELECT f24, f25, f26 FROM T1
+ """.stripMargin
+ tableEnv.executeSql(sourceDDL)
+ tableEnv.executeSql(viewDDL)
+
+ val tableResult1 = tableEnv.executeSql("describe T1")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult1.getResultKind)
+ checkData(
+ util.Arrays.asList(
+ Row.of("f0", "CHAR(10)", Boolean.box(true), null, null, null),
+ Row.of("f1", "VARCHAR(10)", Boolean.box(true), null, null, null),
+ Row.of("f2", "STRING", Boolean.box(true), null, null, null),
+ Row.of("f3", "BOOLEAN", Boolean.box(true), null, null, null),
+ Row.of("f4", "BINARY(10)", Boolean.box(true), null, null, null),
+ Row.of("f5", "VARBINARY(10)", Boolean.box(true), null, null, null),
+ Row.of("f6", "BYTES", Boolean.box(true), null, null, null),
+ Row.of("f7", "DECIMAL(10, 3)", Boolean.box(true), null, null, null),
+ Row.of("f8", "TINYINT", Boolean.box(true), null, null, null),
+ Row.of("f9", "SMALLINT", Boolean.box(true), null, null, null),
+ Row.of("f10", "INT", Boolean.box(true), null, null, null),
+ Row.of("f11", "BIGINT", Boolean.box(true), null, null, null),
+ Row.of("f12", "FLOAT", Boolean.box(true), null, null, null),
+ Row.of("f13", "DOUBLE", Boolean.box(true), null, null, null),
+ Row.of("f14", "DATE", Boolean.box(true), null, null, null),
+ Row.of("f15", "TIME(0)", Boolean.box(true), null, null, null),
+ Row.of("f16", "TIMESTAMP(6)", Boolean.box(true), null, null, null),
+ Row.of("f17", "TIMESTAMP(3)", Boolean.box(true), null, null, null),
+ Row.of("f18", "TIMESTAMP(6)", Boolean.box(true), null, null, null),
+ Row.of("f19", "TIMESTAMP(3) WITH LOCAL TIME ZONE", Boolean.box(true), null, null, null),
+ Row.of("f20", "TIMESTAMP(6) WITH LOCAL TIME ZONE", Boolean.box(true), null, null, null),
+ Row.of("f21", "ARRAY<INT>", Boolean.box(true), null, null, null),
+ Row.of("f22", "MAP<INT, STRING>", Boolean.box(true), null, null, null),
+ Row.of("f23", "ROW<`f0` INT, `f1` STRING>", Boolean.box(true), null, null, null),
+ Row.of("f24", "INT", Boolean.box(false), "PRI(f24, f26)", null, null),
+ Row.of("f25", "STRING", Boolean.box(false), null, null, null),
+ Row.of("f26", "ROW<`f0` INT NOT NULL, `f1` INT>", Boolean.box(false),
+ "PRI(f24, f26)", null, null),
+ Row.of("ts", "TIMESTAMP(3)", Boolean.box(true), null, "TO_TIMESTAMP(`f25`)",
+ "`ts` - INTERVAL '1' SECOND")
+ ).iterator(),
+ tableResult1.collect())
+
+ val tableResult2 = tableEnv.executeSql("describe T2")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+ checkData(
+ util.Arrays.asList(
+ Row.of("d", "INT", Boolean.box(false), null, null, null),
+ Row.of("e", "STRING", Boolean.box(false), null, null, null),
+ Row.of("f", "ROW<`f0` INT NOT NULL, `f1` INT>", Boolean.box(false), null, null, null)
+ ).iterator(),
+ tableResult2.collect())
+
+ // temporary view T2(x, y) masks permanent view T2(d, e, f)
+ val temporaryViewDDL =
+ """
+ |CREATE TEMPORARY VIEW IF NOT EXISTS T2(x, y) AS SELECT f24, f25 FROM T1
+ """.stripMargin
+ tableEnv.executeSql(temporaryViewDDL)
+
+ val tableResult3 = tableEnv.executeSql("describe T2")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+ checkData(
+ util.Arrays.asList(
+ Row.of("x", "INT", Boolean.box(false), null, null, null),
+ Row.of("y", "STRING", Boolean.box(false), null, null, null)).iterator(),
+ tableResult3.collect())
+ }
+
private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
while (expected.hasNext && actual.hasNext) {
assertEquals(expected.next(), actual.next())
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index a465f56..a0239c4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -36,6 +36,7 @@ import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
import org.apache.flink.sql.parser.dml.RichSqlInsert;
+import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
import org.apache.flink.sql.parser.dql.SqlShowDatabases;
import org.apache.flink.sql.parser.dql.SqlShowFunctions;
@@ -61,6 +62,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.PlannerQueryOperation;
@@ -193,6 +195,8 @@ public class SqlToOperationConverter {
return Optional.of(converter.convertShowViews((SqlShowViews) validated));
} else if (validated instanceof SqlExplain) {
return Optional.of(converter.convertExplain((SqlExplain) validated));
+ } else if (validated instanceof SqlRichDescribeTable) {
+ return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
@@ -575,6 +579,14 @@ public class SqlToOperationConverter {
return new ExplainOperation(operation);
}
+ /** Convert DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier. */
+ private Operation convertDescribeTable(SqlRichDescribeTable sqlRichDescribeTable) {
+ UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlRichDescribeTable.fullTableName());
+ ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+ return new DescribeTableOperation(identifier, sqlRichDescribeTable.isExtended());
+ }
+
/**
* Create a table schema from {@link SqlCreateTable}. This schema contains computed column
* fields, say, we have a create table DDL statement:
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index ab89dda..46fe90e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -49,6 +49,8 @@ import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.tools.FrameworkConfig
+import org.apache.commons.lang3.StringUtils
+
import _root_.java.lang.{Iterable => JIterable, Long => JLong}
import _root_.java.util.function.{Function => JFunction, Supplier => JSupplier}
import _root_.java.util.{Optional, Collections => JCollections, HashMap => JHashMap, List => JList, Map => JMap}
@@ -140,7 +142,7 @@ abstract class TableEnvImpl(
"CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, " +
"CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, USE CATALOG, USE [CATALOG.]DATABASE, " +
"SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, CREATE VIEW, DROP VIEW, " +
- "SHOW VIEWS, INSERT."
+ "SHOW VIEWS, INSERT, DESCRIBE."
private def isStreamingMode: Boolean = this match {
case _: BatchTableEnvImpl => false
@@ -626,7 +628,7 @@ abstract class TableEnvImpl(
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(tableSchema)
.data(tableSink.getResultIterator)
- .setPrintStyle(PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH))
+ .setPrintStyle(PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN))
.build
} catch {
case e: Exception =>
@@ -809,6 +811,15 @@ abstract class TableEnvImpl(
.data(JCollections.singletonList(Row.of(explanation)))
.setPrintStyle(PrintStyle.rawContent())
.build
+ case descOperation: DescribeTableOperation =>
+ val result = catalogManager.getTable(descOperation.getSqlIdentifier)
+ if (result.isPresent) {
+ buildDescribeResult(result.get.getTable.getSchema)
+ } else {
+ throw new ValidationException(String.format(
+ "Table or view with identifier '%s' doesn't exist",
+ descOperation.getSqlIdentifier.asSummaryString()))
+ }
case queryOperation: QueryOperation =>
executeInternal(queryOperation)
@@ -818,10 +829,51 @@ abstract class TableEnvImpl(
}
private def buildShowResult(objects: Array[String]): TableResult = {
+ val rows = Array.ofDim[Object](objects.length, 1)
+ objects.zipWithIndex.foreach {
+ case (obj, i) => rows(i)(0) = obj
+ }
+ buildResult(Array("result"), Array(DataTypes.STRING), rows)
+ }
+
+ private def buildDescribeResult(schema: TableSchema): TableResult = {
+ val fieldToWatermark =
+ schema
+ .getWatermarkSpecs
+ .map(w => (w.getRowtimeAttribute, w.getWatermarkExpr)).toMap
+ val fieldToPrimaryKey = new JHashMap[String, String]()
+ if (schema.getPrimaryKey.isPresent) {
+ val columns = schema.getPrimaryKey.get.getColumns.asScala
+ columns.foreach(c => fieldToPrimaryKey.put(c, s"PRI(${columns.mkString(", ")})"))
+ }
+ val data = Array.ofDim[Object](schema.getFieldCount, 6)
+ schema.getTableColumns.asScala.zipWithIndex.foreach {
+ case (c, i) => {
+ val logicalType = c.getType.getLogicalType
+ data(i)(0) = c.getName
+ data(i)(1) = StringUtils.removeEnd(logicalType.toString, " NOT NULL")
+ data(i)(2) = Boolean.box(logicalType.isNullable)
+ data(i)(3) = fieldToPrimaryKey.getOrDefault(c.getName, null)
+ data(i)(4) = c.getExpr.orElse(null)
+ data(i)(5) = fieldToWatermark.getOrDefault(c.getName, null)
+ }
+ }
+ buildResult(
+ Array("name", "type", "null", "key", "compute column", "watermark"),
+ Array(DataTypes.STRING, DataTypes.STRING, DataTypes.BOOLEAN, DataTypes.STRING,
+ DataTypes.STRING, DataTypes.STRING),
+ data)
+ }
+
+ private def buildResult(
+ headers: Array[String],
+ types: Array[DataType],
+ rows: Array[Array[Object]]): TableResult = {
TableResultImpl.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
- .tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
- .data(objects.map(Row.of(_)).toList)
+ .tableSchema(
+ TableSchema.builder().fields(headers, types).build())
+ .data(rows.map(Row.of(_:_*)).toList)
.build()
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index 3d7dae4..4ac4d09 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -19,9 +19,10 @@
package org.apache.flink.table.calcite
import org.apache.flink.sql.parser.ExtendedSqlNode
-import org.apache.flink.sql.parser.dql.{SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews}
+import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews}
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.catalog.CatalogReader
+
import org.apache.calcite.plan.RelOptTable.ViewExpander
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelRoot
@@ -31,6 +32,7 @@ import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
+
import _root_.java.lang.{Boolean => JBoolean}
import _root_.java.util
import _root_.java.util.function.{Function => JFunction}
@@ -124,7 +126,8 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlShowDatabases]
|| sqlNode.isInstanceOf[SqlShowTables]
|| sqlNode.isInstanceOf[SqlShowFunctions]
- || sqlNode.isInstanceOf[SqlShowViews]) {
+ || sqlNode.isInstanceOf[SqlShowViews]
+ || sqlNode.isInstanceOf[SqlRichDescribeTable]) {
return sqlNode
}
sqlNode match {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index dc6ad58..87e0a9f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -527,6 +527,34 @@ class BatchTableEnvironmentTest extends TableTestBase {
}
}
+ @Test
+ def testExecuteSqlWithDescribe(): Unit = {
+ val testUtil = batchTestUtil()
+ val createTableStmt =
+ """
+ |CREATE TABLE tbl1 (
+ | a bigint,
+ | b int,
+ | c varchar
+ |) with (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ val tableResult1 = testUtil.tableEnv.executeSql(createTableStmt)
+ assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+ val tableResult2 = testUtil.tableEnv.executeSql("DESCRIBE tbl1")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+ checkData(
+ java.util.Arrays.asList(
+ Row.of("a", "BIGINT", Boolean.box(true), null, null, null),
+ Row.of("b", "INT", Boolean.box(true), null, null, null),
+ Row.of("c", "STRING", Boolean.box(true), null, null, null)
+ ).iterator(),
+ tableResult2.collect())
+ }
+
private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
while (expected.hasNext && actual.hasNext) {
assertEquals(expected.next(), actual.next())