You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/05/12 12:41:32 UTC

[flink] branch master updated: [FLINK-17112][table] Support DESCRIBE statement in Flink SQL

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c5c6b76  [FLINK-17112][table] Support DESCRIBE statement in Flink SQL
c5c6b76 is described below

commit c5c6b7612d0b5492ec4db8233709bf0a1093a1fe
Author: Zhenghua Gao <do...@gmail.com>
AuthorDate: Tue May 12 20:41:16 2020 +0800

    [FLINK-17112][table] Support DESCRIBE statement in Flink SQL
    
    This closes #11892
---
 .../table/api/internal/TableEnvironmentImpl.java   |  69 +++++++++++-
 .../flink/table/api/internal/TableResultImpl.java  |  22 ++--
 .../table/operations/DescribeTableOperation.java   |  59 +++++++++++
 .../org/apache/flink/table/utils/PrintUtils.java   |  15 ++-
 .../operations/SqlToOperationConverter.java        |  12 +++
 .../table/planner/calcite/FlinkPlannerImpl.scala   |   7 +-
 .../flink/table/api/TableEnvironmentTest.scala     | 116 ++++++++++++++++++++-
 .../table/sqlexec/SqlToOperationConverter.java     |  12 +++
 .../flink/table/api/internal/TableEnvImpl.scala    |  60 ++++++++++-
 .../flink/table/calcite/FlinkPlannerImpl.scala     |   7 +-
 .../api/batch/BatchTableEnvironmentTest.scala      |  28 +++++
 11 files changed, 382 insertions(+), 25 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 23752c3..de7bd96 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.WatermarkSpec;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogFunction;
@@ -76,6 +77,7 @@ import org.apache.flink.table.module.Module;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
@@ -112,13 +114,17 @@ import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.utils.PrintUtils;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.types.Row;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -156,7 +162,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 			"CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, " +
 			"CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, " +
 			"SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, CREATE VIEW, DROP VIEW, SHOW VIEWS, " +
-			"INSERT.";
+			"INSERT, DESCRIBE.";
 
 	/**
 	 * Provides necessary methods for {@link ConnectTableDescriptor}.
@@ -712,7 +718,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 					.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
 					.tableSchema(tableSchema)
 					.data(tableSink.getResultIterator())
-					.setPrintStyle(TableResultImpl.PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH))
+					.setPrintStyle(TableResultImpl.PrintStyle.tableau(
+							PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN))
 					.build();
 		} catch (Exception e) {
 			throw new TableException("Failed to execute sql", e);
@@ -966,6 +973,17 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 					.data(Collections.singletonList(Row.of(explanation)))
 					.setPrintStyle(TableResultImpl.PrintStyle.rawContent())
 					.build();
+		} else if (operation instanceof DescribeTableOperation) {
+			DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
+			Optional<CatalogManager.TableLookupResult> result =
+					catalogManager.getTable(describeTableOperation.getSqlIdentifier());
+			if (result.isPresent()) {
+				return buildDescribeResult(result.get().getTable().getSchema());
+			} else {
+				throw new ValidationException(String.format(
+						"Tables or views with the identifier '%s' doesn't exist",
+						describeTableOperation.getSqlIdentifier().asSummaryString()));
+			}
 		} else if (operation instanceof QueryOperation) {
 			return executeInternal((QueryOperation) operation);
 		} else {
@@ -974,10 +992,53 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 	}
 
 	private TableResult buildShowResult(String[] objects) {
+		return buildResult(
+			new String[]{"result"},
+			new DataType[]{DataTypes.STRING()},
+			Arrays.stream(objects).map((c) -> new String[]{c}).toArray(String[][]::new));
+	}
+
+	private TableResult buildDescribeResult(TableSchema schema) {
+		Map<String, String> fieldToWatermark =
+				schema.getWatermarkSpecs()
+						.stream()
+						.collect(Collectors.toMap(WatermarkSpec::getRowtimeAttribute, WatermarkSpec::getWatermarkExpr));
+
+		Map<String, String> fieldToPrimaryKey = new HashMap<>();
+		schema.getPrimaryKey().ifPresent((p) -> {
+			List<String> columns = p.getColumns();
+			columns.forEach((c) -> fieldToPrimaryKey.put(c, String.format("PRI(%s)", String.join(", ", columns))));
+		});
+
+		Object[][] rows =
+			schema.getTableColumns()
+				.stream()
+				.map((c) -> {
+					LogicalType logicalType = c.getType().getLogicalType();
+					return new Object[]{
+						c.getName(),
+						StringUtils.removeEnd(logicalType.toString(), " NOT NULL"),
+						logicalType.isNullable(),
+						fieldToPrimaryKey.getOrDefault(c.getName(), null),
+						c.getExpr().orElse(null),
+						fieldToWatermark.getOrDefault(c.getName(), null)};
+				}).toArray(Object[][]::new);
+
+		return buildResult(
+			new String[]{"name", "type", "null", "key", "computed column", "watermark"},
+			new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.BOOLEAN(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()},
+			rows);
+	}
+
+	private TableResult buildResult(String[] headers, DataType[] types, Object[][] rows) {
 		return TableResultImpl.builder()
 				.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-				.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
-				.data(Arrays.stream(objects).map(Row::of).collect(Collectors.toList()))
+				.tableSchema(
+					TableSchema.builder().fields(
+						headers,
+						types).build())
+				.data(Arrays.stream(rows).map(Row::of).collect(Collectors.toList()))
+				.setPrintStyle(TableResultImpl.PrintStyle.tableau(Integer.MAX_VALUE, ""))
 				.build();
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
index d04bb2e..5c82f5e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
@@ -92,7 +92,9 @@ class TableResultImpl implements TableResult {
 		Iterator<Row> it = collect();
 		if (printStyle instanceof TableauStyle) {
 			int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
-			PrintUtils.printAsTableauForm(getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth);
+			String nullColumn = ((TableauStyle) printStyle).getNullColumn();
+			PrintUtils.printAsTableauForm(
+					getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn);
 		} else if (printStyle instanceof RawContentStyle) {
 			while (it.hasNext()) {
 				System.out.println(String.join(",", PrintUtils.rowToString(it.next())));
@@ -114,7 +116,7 @@ class TableResultImpl implements TableResult {
 		private TableSchema tableSchema = null;
 		private ResultKind resultKind = null;
 		private Iterator<Row> data = null;
-		private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE);
+		private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN);
 
 		private Builder() {
 		}
@@ -195,12 +197,13 @@ class TableResultImpl implements TableResult {
 	 */
 	public interface PrintStyle {
 		/**
-		 * Create a tableau print style with given max column width,
+		 * Create a tableau print style with given max column width and null column,
 		 * which prints the result schema and content as tableau form.
 		 */
-		static PrintStyle tableau(int maxColumnWidth) {
+		static PrintStyle tableau(int maxColumnWidth, String nullColumn) {
 			Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
-			return new TableauStyle(maxColumnWidth);
+			Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
+			return new TableauStyle(maxColumnWidth, nullColumn);
 		}
 
 		/**
@@ -217,15 +220,22 @@ class TableResultImpl implements TableResult {
 	 * print the result schema and content as tableau form.
 	 */
 	private static final class TableauStyle implements PrintStyle {
+
 		private final int maxColumnWidth;
+		private final String nullColumn;
 
-		private TableauStyle(int maxColumnWidth) {
+		private TableauStyle(int maxColumnWidth, String nullColumn) {
 			this.maxColumnWidth = maxColumnWidth;
+			this.nullColumn = nullColumn;
 		}
 
 		int getMaxColumnWidth() {
 			return maxColumnWidth;
 		}
+
+		String getNullColumn() {
+			return nullColumn;
+		}
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java
new file mode 100644
index 0000000..ecd8bc7
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier statement.
+ */
+public class DescribeTableOperation implements Operation {
+
+	private final ObjectIdentifier sqlIdentifier;
+	private final boolean isExtended;
+
+	public DescribeTableOperation(ObjectIdentifier sqlIdentifier, boolean isExtended) {
+		this.sqlIdentifier = sqlIdentifier;
+		this.isExtended = isExtended;
+	}
+
+	public ObjectIdentifier getSqlIdentifier() {
+		return sqlIdentifier;
+	}
+
+	public boolean isExtended() {
+		return isExtended;
+	}
+
+	@Override
+	public String asSummaryString() {
+		Map<String, Object> params = new LinkedHashMap<>();
+		params.put("identifier", sqlIdentifier);
+		params.put("isExtended", isExtended);
+		return OperationUtils.formatWithChildren(
+			"DESCRIBE",
+			params,
+			Collections.emptyList(),
+			Operation::asSummaryString);
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
index 4de488e..7916fa1 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java
@@ -40,7 +40,7 @@ public class PrintUtils {
 
 	// constants for printing
 	public static final int MAX_COLUMN_WIDTH = 30;
-	private static final String NULL_COLUMN = "(NULL)";
+	public static final String NULL_COLUMN = "(NULL)";
 	private static final String COLUMN_TRUNCATED_FLAG = "...";
 
 	private PrintUtils() {
@@ -65,7 +65,7 @@ public class PrintUtils {
 			TableSchema tableSchema,
 			Iterator<Row> it,
 			PrintWriter printWriter) {
-		printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH);
+		printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH, NULL_COLUMN);
 	}
 
 	/**
@@ -87,14 +87,15 @@ public class PrintUtils {
 			TableSchema tableSchema,
 			Iterator<Row> it,
 			PrintWriter printWriter,
-			int maxColumnWidth) {
+			int maxColumnWidth,
+			String nullColumn) {
 		List<String[]> rows = new ArrayList<>();
 
 		// fill field names first
 		List<TableColumn> columns = tableSchema.getTableColumns();
 		rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new));
 		while (it.hasNext()) {
-			rows.add(rowToString(it.next()));
+			rows.add(rowToString(it.next(), nullColumn));
 		}
 
 		int[] colWidths = columnWidthsByContent(columns, rows, maxColumnWidth);
@@ -123,11 +124,15 @@ public class PrintUtils {
 	}
 
 	public static String[] rowToString(Row row) {
+		return rowToString(row, NULL_COLUMN);
+	}
+
+	public static String[] rowToString(Row row, String nullColumn) {
 		final String[] fields = new String[row.getArity()];
 		for (int i = 0; i < row.getArity(); i++) {
 			final Object field = row.getField(i);
 			if (field == null) {
-				fields[i] = NULL_COLUMN;
+				fields[i] = nullColumn;
 			} else {
 				fields[i] = StringUtils.arrayAwareToString(field);
 			}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 49138d1..11dece9 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -39,6 +39,7 @@ import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
 import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 import org.apache.flink.sql.parser.dml.RichSqlInsert;
+import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
 import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
 import org.apache.flink.sql.parser.dql.SqlShowDatabases;
 import org.apache.flink.sql.parser.dql.SqlShowFunctions;
@@ -64,6 +65,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ShowCatalogsOperation;
@@ -205,6 +207,8 @@ public class SqlToOperationConverter {
 			return Optional.of(converter.convertShowViews((SqlShowViews) validated));
 		} else if (validated instanceof SqlExplain) {
 			return Optional.of(converter.convertExplain((SqlExplain) validated));
+		} else if (validated instanceof SqlRichDescribeTable) {
+			return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated));
 		} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
 			return Optional.of(converter.convertSqlQuery(validated));
 		} else {
@@ -610,6 +614,14 @@ public class SqlToOperationConverter {
 		return new ExplainOperation(operation);
 	}
 
+	/** Convert DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier. */
+	private Operation convertDescribeTable(SqlRichDescribeTable sqlRichDescribeTable) {
+		UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlRichDescribeTable.fullTableName());
+		ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+		return new DescribeTableOperation(identifier, sqlRichDescribeTable.isExtended());
+	}
+
 	/** Fallback method for sql query. */
 	private Operation convertSqlQuery(SqlNode node) {
 		return toQueryOperation(flinkPlanner, node);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 846f536..b38fa92 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -19,9 +19,10 @@
 package org.apache.flink.table.planner.calcite
 
 import org.apache.flink.sql.parser.ExtendedSqlNode
-import org.apache.flink.sql.parser.dql.{SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews}
+import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews}
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader
+
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.config.NullCollation
 import org.apache.calcite.plan._
@@ -33,6 +34,7 @@ import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
 import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
 import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
+
 import java.lang.{Boolean => JBoolean}
 import java.util
 import java.util.function.{Function => JFunction}
@@ -126,7 +128,8 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowDatabases]
         || sqlNode.isInstanceOf[SqlShowTables]
         || sqlNode.isInstanceOf[SqlShowFunctions]
-        || sqlNode.isInstanceOf[SqlShowViews]) {
+        || sqlNode.isInstanceOf[SqlShowViews]
+        || sqlNode.isInstanceOf[SqlRichDescribeTable]) {
         return sqlNode
       }
       sqlNode match {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 93177a9..a890d7c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -728,9 +728,11 @@ class TableEnvironmentTest {
     val sourceDDL =
       """
         |CREATE TABLE T1(
-        |  a int,
+        |  a int not null,
         |  b varchar,
-        |  c int
+        |  c int,
+        |  ts AS to_timestamp(b),
+        |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
         |) with (
         |  'connector' = 'COLLECTION'
         |)
@@ -975,6 +977,116 @@ class TableEnvironmentTest {
     }
   }
 
+  @Test
+  def testDescribeTableOrView(): Unit = {
+    val sourceDDL =
+      """
+        |CREATE TABLE T1(
+        |  f0 char(10),
+        |  f1 varchar(10),
+        |  f2 string,
+        |  f3 BOOLEAN,
+        |  f4 BINARY(10),
+        |  f5 VARBINARY(10),
+        |  f6 BYTES,
+        |  f7 DECIMAL(10, 3),
+        |  f8 TINYINT,
+        |  f9 SMALLINT,
+        |  f10 INTEGER,
+        |  f11 BIGINT,
+        |  f12 FLOAT,
+        |  f13 DOUBLE,
+        |  f14 DATE,
+        |  f15 TIME,
+        |  f16 TIMESTAMP,
+        |  f17 TIMESTAMP(3),
+        |  f18 TIMESTAMP WITHOUT TIME ZONE,
+        |  f19 TIMESTAMP(3) WITH LOCAL TIME ZONE,
+        |  f20 TIMESTAMP WITH LOCAL TIME ZONE,
+        |  f21 ARRAY<INT>,
+        |  f22 MAP<INT, STRING>,
+        |  f23 ROW<f0 INT, f1 STRING>,
+        |  f24 int not null,
+        |  f25 varchar not null,
+        |  f26 row<f0 int not null, f1 int> not null,
+        |  ts AS to_timestamp(f25),
+        |  PRIMARY KEY(f24, f26) NOT ENFORCED,
+        |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+
+    val viewDDL =
+      """
+        |CREATE VIEW IF NOT EXISTS T2(d, e, f) AS SELECT f24, f25, f26 FROM T1
+      """.stripMargin
+    tableEnv.executeSql(sourceDDL)
+    tableEnv.executeSql(viewDDL)
+
+    val tableResult1 = tableEnv.executeSql("describe T1")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult1.getResultKind)
+    checkData(
+      util.Arrays.asList(
+        Row.of("f0", "CHAR(10)", Boolean.box(true), null, null, null),
+        Row.of("f1", "VARCHAR(10)", Boolean.box(true), null, null, null),
+        Row.of("f2", "STRING", Boolean.box(true), null, null, null),
+        Row.of("f3", "BOOLEAN", Boolean.box(true), null, null, null),
+        Row.of("f4", "BINARY(10)", Boolean.box(true), null, null, null),
+        Row.of("f5", "VARBINARY(10)", Boolean.box(true), null, null, null),
+        Row.of("f6", "BYTES", Boolean.box(true), null, null, null),
+        Row.of("f7", "DECIMAL(10, 3)", Boolean.box(true), null, null, null),
+        Row.of("f8", "TINYINT", Boolean.box(true), null, null, null),
+        Row.of("f9", "SMALLINT", Boolean.box(true), null, null, null),
+        Row.of("f10", "INT", Boolean.box(true), null, null, null),
+        Row.of("f11", "BIGINT", Boolean.box(true), null, null, null),
+        Row.of("f12", "FLOAT", Boolean.box(true), null, null, null),
+        Row.of("f13", "DOUBLE", Boolean.box(true), null, null, null),
+        Row.of("f14", "DATE", Boolean.box(true), null, null, null),
+        Row.of("f15", "TIME(0)", Boolean.box(true), null, null, null),
+        Row.of("f16", "TIMESTAMP(6)", Boolean.box(true), null, null, null),
+        Row.of("f17", "TIMESTAMP(3)", Boolean.box(true), null, null, null),
+        Row.of("f18", "TIMESTAMP(6)", Boolean.box(true), null, null, null),
+        Row.of("f19", "TIMESTAMP(3) WITH LOCAL TIME ZONE", Boolean.box(true), null, null, null),
+        Row.of("f20", "TIMESTAMP(6) WITH LOCAL TIME ZONE", Boolean.box(true), null, null, null),
+        Row.of("f21", "ARRAY<INT>", Boolean.box(true), null, null, null),
+        Row.of("f22", "MAP<INT, STRING>", Boolean.box(true), null, null, null),
+        Row.of("f23", "ROW<`f0` INT, `f1` STRING>", Boolean.box(true), null, null, null),
+        Row.of("f24", "INT", Boolean.box(false), "PRI(f24, f26)", null, null),
+        Row.of("f25", "STRING", Boolean.box(false), null, null, null),
+        Row.of("f26", "ROW<`f0` INT NOT NULL, `f1` INT>", Boolean.box(false),
+          "PRI(f24, f26)", null, null),
+        Row.of("ts", "TIMESTAMP(3)", Boolean.box(true), null, "TO_TIMESTAMP(`f25`)",
+          "`ts` - INTERVAL '1' SECOND")
+      ).iterator(),
+      tableResult1.collect())
+
+    val tableResult2 = tableEnv.executeSql("describe T2")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    checkData(
+      util.Arrays.asList(
+        Row.of("d", "INT", Boolean.box(false), null, null, null),
+        Row.of("e", "STRING", Boolean.box(false), null, null, null),
+        Row.of("f", "ROW<`f0` INT NOT NULL, `f1` INT>", Boolean.box(false), null, null, null)
+      ).iterator(),
+      tableResult2.collect())
+
+    // temporary view T2(x, y) masks permanent view T2(d, e, f)
+    val temporaryViewDDL =
+      """
+        |CREATE TEMPORARY VIEW IF NOT EXISTS T2(x, y) AS SELECT f24, f25 FROM T1
+      """.stripMargin
+    tableEnv.executeSql(temporaryViewDDL)
+
+    val tableResult3 = tableEnv.executeSql("describe T2")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
+    checkData(
+      util.Arrays.asList(
+        Row.of("x", "INT", Boolean.box(false), null, null, null),
+        Row.of("y", "STRING", Boolean.box(false), null, null, null)).iterator(),
+      tableResult3.collect())
+  }
+
   private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
     while (expected.hasNext && actual.hasNext) {
       assertEquals(expected.next(), actual.next())
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index a465f56..a0239c4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -36,6 +36,7 @@ import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
 import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
 import org.apache.flink.sql.parser.dml.RichSqlInsert;
+import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
 import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
 import org.apache.flink.sql.parser.dql.SqlShowDatabases;
 import org.apache.flink.sql.parser.dql.SqlShowFunctions;
@@ -61,6 +62,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.PlannerQueryOperation;
@@ -193,6 +195,8 @@ public class SqlToOperationConverter {
 			return Optional.of(converter.convertShowViews((SqlShowViews) validated));
 		} else if (validated instanceof SqlExplain) {
 			return Optional.of(converter.convertExplain((SqlExplain) validated));
+		} else if (validated instanceof SqlRichDescribeTable) {
+			return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated));
 		} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
 			return Optional.of(converter.convertSqlQuery(validated));
 		} else {
@@ -575,6 +579,14 @@ public class SqlToOperationConverter {
 		return new ExplainOperation(operation);
 	}
 
+	/** Convert DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier. */
+	private Operation convertDescribeTable(SqlRichDescribeTable sqlRichDescribeTable) {
+		UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlRichDescribeTable.fullTableName());
+		ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+		return new DescribeTableOperation(identifier, sqlRichDescribeTable.isExtended());
+	}
+
 	/**
 	 * Create a table schema from {@link SqlCreateTable}. This schema contains computed column
 	 * fields, say, we have a create table DDL statement:
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index ab89dda..46fe90e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -49,6 +49,8 @@ import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.tools.FrameworkConfig
 
+import org.apache.commons.lang3.StringUtils
+
 import _root_.java.lang.{Iterable => JIterable, Long => JLong}
 import _root_.java.util.function.{Function => JFunction, Supplier => JSupplier}
 import _root_.java.util.{Optional, Collections => JCollections, HashMap => JHashMap, List => JList, Map => JMap}
@@ -140,7 +142,7 @@ abstract class TableEnvImpl(
       "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, " +
       "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, USE CATALOG, USE [CATALOG.]DATABASE, " +
       "SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, CREATE VIEW, DROP VIEW, " +
-      "SHOW VIEWS, INSERT."
+      "SHOW VIEWS, INSERT, DESCRIBE."
 
   private def isStreamingMode: Boolean = this match {
     case _: BatchTableEnvImpl => false
@@ -626,7 +628,7 @@ abstract class TableEnvImpl(
         .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
         .tableSchema(tableSchema)
         .data(tableSink.getResultIterator)
-        .setPrintStyle(PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH))
+        .setPrintStyle(PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN))
         .build
     } catch {
       case e: Exception =>
@@ -809,6 +811,15 @@ abstract class TableEnvImpl(
           .data(JCollections.singletonList(Row.of(explanation)))
           .setPrintStyle(PrintStyle.rawContent())
           .build
+      case descOperation: DescribeTableOperation =>
+        val result = catalogManager.getTable(descOperation.getSqlIdentifier)
+        if (result.isPresent) {
+          buildDescribeResult(result.get.getTable.getSchema)
+        } else {
+          throw new ValidationException(String.format(
+            "Table or view with identifier '%s' doesn't exist",
+            descOperation.getSqlIdentifier.asSummaryString()))
+        }
       case queryOperation: QueryOperation =>
         executeInternal(queryOperation)
 
@@ -818,10 +829,51 @@ abstract class TableEnvImpl(
   }
 
   private def buildShowResult(objects: Array[String]): TableResult = {
+    val rows = Array.ofDim[Object](objects.length, 1)
+    objects.zipWithIndex.foreach {
+      case (obj, i) => rows(i)(0) = obj
+    }
+    buildResult(Array("result"), Array(DataTypes.STRING), rows)
+  }
+
+  private def buildDescribeResult(schema: TableSchema): TableResult = {
+    val fieldToWatermark =
+      schema
+        .getWatermarkSpecs
+        .map(w => (w.getRowtimeAttribute, w.getWatermarkExpr)).toMap
+    val fieldToPrimaryKey = new JHashMap[String, String]()
+    if (schema.getPrimaryKey.isPresent) {
+      val columns = schema.getPrimaryKey.get.getColumns.asScala
+      columns.foreach(c => fieldToPrimaryKey.put(c, s"PRI(${columns.mkString(", ")})"))
+    }
+    val data = Array.ofDim[Object](schema.getFieldCount, 6)
+    schema.getTableColumns.asScala.zipWithIndex.foreach {
+      case (c, i) => {
+        val logicalType = c.getType.getLogicalType
+        data(i)(0) = c.getName
+        data(i)(1) = StringUtils.removeEnd(logicalType.toString, " NOT NULL")
+        data(i)(2) = Boolean.box(logicalType.isNullable)
+        data(i)(3) = fieldToPrimaryKey.getOrDefault(c.getName, null)
+        data(i)(4) = c.getExpr.orElse(null)
+        data(i)(5) = fieldToWatermark.getOrDefault(c.getName, null)
+      }
+    }
+    buildResult(
+      Array("name", "type", "null", "key", "compute column", "watermark"),
+      Array(DataTypes.STRING, DataTypes.STRING, DataTypes.BOOLEAN, DataTypes.STRING,
+        DataTypes.STRING, DataTypes.STRING),
+      data)
+  }
+
+  private def buildResult(
+      headers: Array[String],
+      types: Array[DataType],
+      rows: Array[Array[Object]]): TableResult = {
     TableResultImpl.builder()
       .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-      .tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
-      .data(objects.map(Row.of(_)).toList)
+      .tableSchema(
+        TableSchema.builder().fields(headers, types).build())
+      .data(rows.map(Row.of(_:_*)).toList)
       .build()
   }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index 3d7dae4..4ac4d09 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -19,9 +19,10 @@
 package org.apache.flink.table.calcite
 
 import org.apache.flink.sql.parser.ExtendedSqlNode
-import org.apache.flink.sql.parser.dql.{SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews}
+import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews}
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.catalog.CatalogReader
+
 import org.apache.calcite.plan.RelOptTable.ViewExpander
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelRoot
@@ -31,6 +32,7 @@ import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
 import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
 import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
+
 import _root_.java.lang.{Boolean => JBoolean}
 import _root_.java.util
 import _root_.java.util.function.{Function => JFunction}
@@ -124,7 +126,8 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowDatabases]
         || sqlNode.isInstanceOf[SqlShowTables]
         || sqlNode.isInstanceOf[SqlShowFunctions]
-        || sqlNode.isInstanceOf[SqlShowViews]) {
+        || sqlNode.isInstanceOf[SqlShowViews]
+        || sqlNode.isInstanceOf[SqlRichDescribeTable]) {
         return sqlNode
       }
       sqlNode match {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index dc6ad58..87e0a9f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -527,6 +527,34 @@ class BatchTableEnvironmentTest extends TableTestBase {
     }
   }
 
+  @Test
+  def testExecuteSqlWithDescribe(): Unit = {
+    val testUtil = batchTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE tbl1 (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = testUtil.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val tableResult2 = testUtil.tableEnv.executeSql("DESCRIBE tbl1")
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    checkData(
+      java.util.Arrays.asList(
+        Row.of("a", "BIGINT", Boolean.box(true), null, null, null),
+        Row.of("b", "INT", Boolean.box(true), null, null, null),
+        Row.of("c", "STRING", Boolean.box(true), null, null, null)
+      ).iterator(),
+      tableResult2.collect())
+  }
+
   private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
     while (expected.hasNext && actual.hasNext) {
       assertEquals(expected.next(), actual.next())