You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/12 05:25:54 UTC

[flink] 02/02: [FLINK-13220][table-planner-blink] Add DDL support for blink planner

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit db488c0d19dbb96466b9b765cecc06784b844926
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Thu Jul 11 21:39:47 2019 +0800

    [FLINK-13220][table-planner-blink] Add DDL support for blink planner
    
    This closes #9093
---
 .../table/operations/ddl/DropTableOperation.java   |   1 +
 flink-table/flink-table-planner-blink/pom.xml      |  15 ++
 .../flink/table/catalog/DatabaseCalciteSchema.java |  51 +++-
 .../apache/flink/table/planner/PlannerContext.java |   4 +
 .../table/sqlexec/SqlConversionException.java      |  35 +++
 .../table/sqlexec/SqlToOperationConverter.java     |  31 ++-
 .../flink/table/calcite/FlinkPlannerImpl.scala     |  23 +-
 .../flink/table/calcite/PreValidateReWriter.scala  | 201 +++++++++++++++
 .../apache/flink/table/planner/PlannerBase.scala   |  39 +--
 .../org.apache.flink.table.factories.TableFactory  |  16 ++
 .../flink/table/catalog/CatalogTableITCase.scala   |  45 ++--
 .../utils/TestCollectionTableFactory.scala         | 269 +++++++++++++++++++++
 .../table/sqlexec/SqlToOperationConverter.java     |  24 +-
 .../flink/table/api/internal/TableEnvImpl.scala    |   3 +-
 .../flink/table/catalog/CatalogTableITCase.scala   |   2 +-
 15 files changed, 669 insertions(+), 90 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
index e173e02..c9f67db 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
@@ -48,6 +48,7 @@ public class DropTableOperation implements DropOperation {
 	@Override
 	public String asSummaryString() {
 		Map<String, Object> params = new LinkedHashMap<>();
+		params.put("tableName", tableName);
 		params.put("IfExists", ifExists);
 
 		return OperationUtils.formatWithChildren(
diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml
index 9f5946a..dc56f44 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -114,6 +114,18 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-sql-parser</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.calcite</groupId>
+					<artifactId>calcite-core</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
@@ -323,6 +335,9 @@ under the License.
 									<include>com.fasterxml.jackson.core:jackson-annotations</include>
 									<include>commons-codec:commons-codec</include>
 
+									<!-- flink-table-planner-blink dependencies -->
+									<include>org.apache.flink.sql.parser:*</include>
+
 									<!-- flink-table-runtime-blink dependencies -->
 									<include>org.codehaus.janino:*</include>
 								</includes>
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 8b8b0861..0ed45ba 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -22,11 +22,16 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.factories.TableFactoryUtil;
+import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.operations.DataStreamQueryOperation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.RichTableSourceQueryOperation;
 import org.apache.flink.table.plan.schema.TableSourceTable;
 import org.apache.flink.table.plan.stats.FlinkStatistic;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
 
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.schema.Schema;
@@ -37,6 +42,7 @@ import org.apache.calcite.schema.Table;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 import static java.lang.String.format;
@@ -80,13 +86,9 @@ class DatabaseCalciteSchema extends FlinkSchema {
 				}
 				return QueryOperationCatalogViewTable.createCalciteTable(view);
 			} else if (table instanceof ConnectorCatalogTable) {
-				ConnectorCatalogTable<?, ?> connectorTable = (ConnectorCatalogTable<?, ?>) table;
-				return connectorTable.getTableSource()
-					.map(tableSource -> new TableSourceTable<>(
-						tableSource,
-						!connectorTable.isBatch(),
-						FlinkStatistic.UNKNOWN())
-					).orElseThrow(() -> new TableException("Cannot query a sink only table."));
+				return convertConnectorTable((ConnectorCatalogTable<?, ?>) table);
+			} else if (table instanceof CatalogTable) {
+				return convertCatalogTable(tablePath, (CatalogTable) table);
 			} else {
 				throw new TableException("Unsupported table type: " + table);
 			}
@@ -101,6 +103,41 @@ class DatabaseCalciteSchema extends FlinkSchema {
 		}
 	}
 
+	private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
+		return table.getTableSource()
+			.map(tableSource -> new TableSourceTable<>(
+				tableSource,
+				!table.isBatch(),
+				FlinkStatistic.UNKNOWN()))
+			.orElseThrow(() -> new TableException("Cannot query a sink only table."));
+	}
+
+	private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
+		TableSource<?> tableSource;
+		Optional<TableFactory> tableFactory = catalog.getTableFactory();
+		if (tableFactory.isPresent()) {
+			TableFactory tf = tableFactory.get();
+			if (tf instanceof TableSourceFactory) {
+				tableSource = ((TableSourceFactory) tf).createTableSource(tablePath, table);
+			} else {
+				throw new TableException(String.format("Cannot query a sink-only table. TableFactory provided by catalog %s must implement TableSourceFactory",
+					catalog.getClass()));
+			}
+		} else {
+			tableSource = TableFactoryUtil.findAndCreateTableSource(table);
+		}
+
+		if (!(tableSource instanceof StreamTableSource)) {
+			throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
+		}
+
+		return new TableSourceTable<>(
+			tableSource,
+			!((StreamTableSource<?>) tableSource).isBounded(),
+			FlinkStatistic.UNKNOWN()
+		);
+	}
+
 	@Override
 	public Set<String> getTableNames() {
 		try {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java
index c323ea0..1ed59a9 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.planner;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.calcite.CalciteConfig;
 import org.apache.flink.table.calcite.CalciteConfig$;
@@ -185,6 +187,8 @@ public class PlannerContext {
 				// and cases are preserved
 				() -> SqlParser
 						.configBuilder()
+						.setParserFactory(FlinkSqlParserImpl.FACTORY)
+						.setConformance(FlinkSqlConformance.DEFAULT)
 						.setLex(Lex.JAVA)
 						.setIdentifierMaxLength(256)
 						.build());
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlConversionException.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlConversionException.java
new file mode 100644
index 0000000..c49a31d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlConversionException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+/**
+ * Exception thrown during the execution of SQL statements.
+ */
+public class SqlConversionException extends RuntimeException {
+
+	private static final long serialVersionUID = 1L;
+
+	public SqlConversionException(String message) {
+		super(message);
+	}
+
+	public SqlConversionException(String message, Throwable e) {
+		super(message, e);
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
similarity index 88%
copy from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
copy to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index d07cf16..ec36796 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -33,7 +33,7 @@ import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.PlannerQueryOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.operations.ddl.DropTableOperation;
-import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.types.LogicalTypeDataTypeConverter;
 
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.type.RelDataType;
@@ -73,7 +73,7 @@ public class SqlToOperationConverter {
 	 * SqlNode will have it's implementation in the #convert(type) method whose 'type' argument
 	 * is subclass of {@code SqlNode}.
 	 *
-	 * @param flinkPlanner     FlinkPlannerImpl to convert sql node to rel node
+	 * @param flinkPlanner     FlinkPlannerImpl to convertCreateTable sql node to rel node
 	 * @param sqlNode          SqlNode to execute on
 	 */
 	public static Operation convert(FlinkPlannerImpl flinkPlanner, SqlNode sqlNode) {
@@ -81,18 +81,21 @@ public class SqlToOperationConverter {
 		final SqlNode validated = flinkPlanner.validate(sqlNode);
 		SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner);
 		if (validated instanceof SqlCreateTable) {
-			return converter.convert((SqlCreateTable) validated);
-		} else if (validated instanceof SqlDropTable){
-			return converter.convert((SqlDropTable) validated);
+			return converter.convertCreateTable((SqlCreateTable) validated);
+		} if (validated instanceof SqlDropTable) {
+			return converter.convertDropTable((SqlDropTable) validated);
+		} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
+			return converter.convertSqlQuery(validated);
 		} else {
-			return converter.convert(validated);
+			throw new TableException("Unsupported node type "
+				+ validated.getClass().getSimpleName());
 		}
 	}
 
 	/**
 	 * Convert the {@link SqlCreateTable} node.
 	 */
-	private Operation convert(SqlCreateTable sqlCreateTable) {
+	private Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
 		// primary key and unique keys are not supported
 		if ((sqlCreateTable.getPrimaryKeyList() != null
 				&& sqlCreateTable.getPrimaryKeyList().size() > 0)
@@ -135,18 +138,13 @@ public class SqlToOperationConverter {
 	}
 
 	/** Convert DROP TABLE statement. */
-	private Operation convert(SqlDropTable sqlDropTable) {
+	private Operation convertDropTable(SqlDropTable sqlDropTable) {
 		return new DropTableOperation(sqlDropTable.fullTableName(), sqlDropTable.getIfExists());
 	}
 
 	/** Fallback method for sql query. */
-	private Operation convert(SqlNode node) {
-		if (node.getKind().belongsTo(SqlKind.QUERY)) {
-			return toQueryOperation(flinkPlanner, node);
-		} else {
-			throw new TableException("Should not invoke to node type "
-				+ node.getClass().getSimpleName());
-		}
+	private Operation convertSqlQuery(SqlNode node) {
+		return toQueryOperation(flinkPlanner, node);
 	}
 
 	//~ Tools ------------------------------------------------------------------
@@ -184,7 +182,8 @@ public class SqlToOperationConverter {
 			final RelDataType relType = column.getType().deriveType(factory,
 				column.getType().getNullable());
 			builder.field(column.getName().getSimple(),
-				TypeConversions.fromLegacyInfoToDataType(FlinkTypeFactory.toTypeInfo(relType)));
+				LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
+					FlinkTypeFactory.toLogicalType(relType)));
 			physicalSchema = builder.build();
 		}
 		assert physicalSchema != null;
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index c3490f9..c0b0a90 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.calcite
 
+import org.apache.flink.sql.parser.ExtendedSqlNode
 import org.apache.flink.table.api.{SqlParserException, TableException, ValidationException}
 
 import com.google.common.collect.ImmutableList
@@ -30,7 +31,7 @@ import org.apache.calcite.rel.{RelFieldCollation, RelRoot}
 import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
 import org.apache.calcite.sql.parser.{SqlParser, SqlParseException => CSqlParseException}
 import org.apache.calcite.sql.validate.SqlValidator
-import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter}
 import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
 
@@ -97,16 +98,28 @@ class FlinkPlannerImpl(
   }
 
   def validate(sqlNode: SqlNode): SqlNode = {
+    val catalogReader = catalogReaderSupplier.apply(false)
+    // do pre-validate rewrite.
+    sqlNode.accept(new PreValidateReWriter(catalogReader, typeFactory))
+    // do extended validation.
+    sqlNode match {
+      case node: ExtendedSqlNode =>
+        node.validate()
+      case _ =>
+    }
+    // no need to validate row type for DDL nodes.
+    if (sqlNode.getKind.belongsTo(SqlKind.DDL)) {
+      return sqlNode
+    }
     validator = new FlinkCalciteSqlValidator(
       operatorTable,
-      catalogReaderSupplier(false), typeFactory)
+      catalogReader,
+      typeFactory)
     validator.setIdentifierExpansion(true)
     validator.setDefaultNullCollation(FlinkPlannerImpl.defaultNullCollation)
-
     try {
       validator.validate(sqlNode)
-    }
-    catch {
+    } catch {
       case e: RuntimeException =>
         throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
new file mode 100644
index 0000000..011c85d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.flink.sql.parser.SqlProperty
+import org.apache.flink.sql.parser.dml.RichSqlInsert
+import org.apache.flink.table.calcite.PreValidateReWriter.appendPartitionProjects
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, RelDataTypeField}
+import org.apache.calcite.runtime.{CalciteContextException, Resources}
+import org.apache.calcite.sql.`type`.SqlTypeUtil
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.util.SqlBasicVisitor
+import org.apache.calcite.sql.validate.{SqlValidatorException, SqlValidatorTable, SqlValidatorUtil}
+import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlLiteral, SqlNode, SqlNodeList, SqlSelect, SqlUtil}
+import org.apache.calcite.util.Static.RESOURCE
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/** Implements [[org.apache.calcite.sql.util.SqlVisitor]]
+  * interface to do some rewrite work before sql node validation. */
+class PreValidateReWriter(
+  val catalogReader: CalciteCatalogReader,
+  val typeFactory: RelDataTypeFactory) extends SqlBasicVisitor[Unit] {
+  override def visit(call: SqlCall): Unit = {
+    call match {
+      case r: RichSqlInsert if r.getStaticPartitions.nonEmpty
+        && r.getSource.isInstanceOf[SqlSelect] =>
+        appendPartitionProjects(r, catalogReader, typeFactory,
+          r.getSource.asInstanceOf[SqlSelect], r.getStaticPartitions)
+      case _ =>
+    }
+  }
+}
+
+object PreValidateReWriter {
+  //~ Tools ------------------------------------------------------------------
+  /**
+    * Append the static partitions to the data source projection list. The columns are appended to
+    * the corresponding positions.
+    *
+    * <p>If we have a table A with schema (&lt;a&gt;, &lt;b&gt;, &lt;c&gt) whose
+    * partition columns are (&lt;a&gt;, &lt;c&gt;), and got a query
+    * <blockquote><pre>
+    * insert into A partition(a='11', c='22')
+    * select b from B
+    * </pre></blockquote>
+    * The query would be rewritten to:
+    * <blockquote><pre>
+    * insert into A partition(a='11', c='22')
+    * select cast('11' as tpe1), b, cast('22' as tpe2) from B
+    * </pre></blockquote>
+    * Where the "tpe1" and "tpe2" are data types of column a and c of target table A.
+    *
+    * @param sqlInsert            RichSqlInsert instance
+    * @param calciteCatalogReader catalog reader
+    * @param typeFactory          type factory
+    * @param select               Source sql select
+    * @param partitions           Static partition statements
+    */
+  def appendPartitionProjects(sqlInsert: RichSqlInsert,
+    calciteCatalogReader: CalciteCatalogReader,
+    typeFactory: RelDataTypeFactory,
+    select: SqlSelect,
+    partitions: SqlNodeList): Unit = {
+    val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
+    val table = calciteCatalogReader.getTable(names)
+    if (table == null) {
+      // There is no table exists in current catalog,
+      // just skip to let other validation error throw.
+      return
+    }
+    val targetRowType = createTargetRowType(typeFactory,
+      calciteCatalogReader, table, sqlInsert.getTargetColumnList)
+    // validate partition fields first.
+    val assignedFields = new util.LinkedHashMap[Integer, SqlNode]
+    val relOptTable = table match {
+      case t: RelOptTable => t
+      case _ => null
+    }
+    for (node <- partitions.getList) {
+      val sqlProperty = node.asInstanceOf[SqlProperty]
+      val id = sqlProperty.getKey
+      val targetField = SqlValidatorUtil.getTargetField(targetRowType,
+        typeFactory, id, calciteCatalogReader, relOptTable)
+      validateField(assignedFields.containsValue, id, targetField)
+      val value = sqlProperty.getValue.asInstanceOf[SqlLiteral]
+      assignedFields.put(targetField.getIndex,
+        maybeCast(value, value.createSqlType(typeFactory), targetField.getType, typeFactory))
+    }
+    val currentNodes = new util.ArrayList[SqlNode](select.getSelectList.getList)
+    val fixedNodes = new util.ArrayList[SqlNode]
+    0 until targetRowType.getFieldList.length foreach {
+      idx =>
+        if (assignedFields.containsKey(idx)) {
+          fixedNodes.add(assignedFields.get(idx))
+        } else if (currentNodes.size() > 0) {
+          fixedNodes.add(currentNodes.remove(0))
+        }
+    }
+    // Although it is error case, we still append the old remaining
+    // projection nodes to new projection.
+    if (currentNodes.size > 0) {
+      fixedNodes.addAll(currentNodes)
+    }
+    select.setSelectList(new SqlNodeList(fixedNodes, select.getSelectList.getParserPosition))
+  }
+
+  /**
+    * Derives a row-type for INSERT and UPDATE operations.
+    *
+    * <p>This code snippet is almost inspired by
+    * [[org.apache.calcite.sql.validate.SqlValidatorImpl#createTargetRowType]].
+    * It is the best that the logic can be merged into Apache Calcite,
+    * but this needs time.
+    *
+    * @param typeFactory      TypeFactory
+    * @param catalogReader    CalciteCatalogReader
+    * @param table            Target table for INSERT/UPDATE
+    * @param targetColumnList List of target columns, or null if not specified
+    * @return Rowtype
+    */
+  private def createTargetRowType(
+    typeFactory: RelDataTypeFactory,
+    catalogReader: CalciteCatalogReader,
+    table: SqlValidatorTable,
+    targetColumnList: SqlNodeList): RelDataType = {
+    val baseRowType = table.getRowType
+    if (targetColumnList == null) return baseRowType
+    val fields = new util.ArrayList[util.Map.Entry[String, RelDataType]]
+    val assignedFields = new util.HashSet[Integer]
+    val relOptTable = table match {
+      case t: RelOptTable => t
+      case _ => null
+    }
+    for (node <- targetColumnList) {
+      val id = node.asInstanceOf[SqlIdentifier]
+      val targetField = SqlValidatorUtil.getTargetField(baseRowType,
+        typeFactory, id, catalogReader, relOptTable)
+      validateField(assignedFields.add, id, targetField)
+      fields.add(targetField)
+    }
+    typeFactory.createStructType(fields)
+  }
+
+  /** Check whether the field is valid. **/
+  private def validateField(tester: Function[Integer, Boolean],
+    id: SqlIdentifier,
+    targetField: RelDataTypeField): Unit = {
+    if (targetField == null) {
+      throw newValidationError(id, RESOURCE.unknownTargetColumn(id.toString))
+    }
+    if (!tester.apply(targetField.getIndex)) {
+      throw newValidationError(id, RESOURCE.duplicateTargetColumn(targetField.getName))
+    }
+  }
+
+  private def newValidationError(node: SqlNode,
+    e: Resources.ExInst[SqlValidatorException]): CalciteContextException = {
+    assert(node != null)
+    val pos = node.getParserPosition
+    SqlUtil.newContextException(pos, e)
+  }
+
+  // This code snippet is copied from the SqlValidatorImpl.
+  private def maybeCast(node: SqlNode,
+    currentType: RelDataType,
+    desiredType: RelDataType,
+    typeFactory: RelDataTypeFactory): SqlNode = {
+    if (currentType == desiredType
+      || (currentType.isNullable != desiredType.isNullable
+      && typeFactory.createTypeWithNullability(currentType, desiredType.isNullable)
+      == desiredType)) {
+      node
+    } else {
+      SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO,
+        node, SqlTypeUtil.convertTypeToSpec(desiredType))
+    }
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
index ef17933..00af797 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
@@ -37,6 +37,7 @@ import org.apache.flink.table.plan.optimize.Optimizer
 import org.apache.flink.table.plan.reuse.SubplanReuser
 import org.apache.flink.table.plan.util.SameRelObjectShuttle
 import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink, TableSinkUtils}
+import org.apache.flink.table.sqlexec.SqlToOperationConverter
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
 import org.apache.flink.table.util.JavaScalaConversionUtil
 
@@ -114,24 +115,26 @@ abstract class PlannerBase(
     val planner = getFlinkPlanner
     // parse the sql query
     val parsed = planner.parse(stmt)
-    if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
-      // validate the sql query
-      val validated = planner.validate(parsed)
-      // transform to a relational tree
-      val relational = planner.rel(validated)
-      List(new PlannerQueryOperation(relational.project()))
-    } else if (null != parsed && parsed.isInstanceOf[SqlInsert]) {
-      val insert = parsed.asInstanceOf[SqlInsert]
-      // validate the SQL query
-      val query = insert.getSource
-      val validatedQuery = planner.validate(query)
-      val sinkOperation = new CatalogSinkModifyOperation(
-        insert.getTargetTable.asInstanceOf[SqlIdentifier].names,
-        new PlannerQueryOperation(planner.rel(validatedQuery).rel)
-      )
-      List(sinkOperation)
-    } else {
-      throw new TableException(s"Unsupported query: $stmt")
+    parsed match {
+      case insert: SqlInsert =>
+        // get name of sink table
+        val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
+
+        List(new CatalogSinkModifyOperation(
+          targetTablePath,
+          SqlToOperationConverter.convert(planner,
+            insert.getSource).asInstanceOf[PlannerQueryOperation]).asInstanceOf[Operation])
+      case query if query.getKind.belongsTo(SqlKind.QUERY) =>
+        // validate the sql query
+        val validated = planner.validate(query)
+        // transform to a relational tree
+        val relational = planner.rel(validated)
+        // can not use SqlToOperationConverter because of the project()
+        List(new PlannerQueryOperation(relational.project()))
+      case ddl if ddl.getKind.belongsTo(SqlKind.DDL) =>
+        List(SqlToOperationConverter.convert(planner, ddl))
+      case _ =>
+        throw new TableException(s"Unsupported query: $stmt")
     }
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..45736bf
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.factories.utils.TestCollectionTableFactory
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
similarity index 92%
copy from flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
copy to flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index d8e5ed2..e06ccb2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -18,10 +18,8 @@
 
 package org.apache.flink.table.catalog
 
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
+import org.apache.flink.table.api.internal.TableEnvironmentImpl
+import org.apache.flink.table.api.{EnvironmentSettings, ExecutionConfigOptions, TableEnvironment, TableException}
 import org.apache.flink.table.factories.utils.TestCollectionTableFactory
 import org.apache.flink.types.Row
 
@@ -38,16 +36,18 @@ import scala.collection.JavaConversions._
 @RunWith(classOf[Parameterized])
 class CatalogTableITCase(isStreaming: Boolean) {
 
-  private val batchExec: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-  private var batchEnv: BatchTableEnvironment = _
-  private val streamExec: StreamExecutionEnvironment = StreamExecutionEnvironment
-    .getExecutionEnvironment
-  private var streamEnv: StreamTableEnvironment = _
+  private val settings = if (isStreaming) {
+    EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
+  } else {
+    EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
+  }
+
+  private val tableEnv: TableEnvironment = TableEnvironmentImpl.create(settings)
 
   private val SOURCE_DATA = List(
-      toRow(1, "a"),
-      toRow(2, "b"),
-      toRow(3, "c")
+    toRow(1, "a"),
+    toRow(2, "b"),
+    toRow(3, "c")
   )
 
   private val DIM_DATA = List(
@@ -64,10 +64,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
 
   @Before
   def before(): Unit = {
-    batchExec.setParallelism(4)
-    streamExec.setParallelism(4)
-    batchEnv = BatchTableEnvironment.create(batchExec)
-    streamEnv = StreamTableEnvironment.create(streamExec)
+    tableEnv.getConfig
+      .getConfiguration
+      .setInteger(ExecutionConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 1)
     TestCollectionTableFactory.reset()
     TestCollectionTableFactory.isStreaming = isStreaming
   }
@@ -80,20 +79,8 @@ class CatalogTableITCase(isStreaming: Boolean) {
     row
   }
 
-  def tableEnv: TableEnvironment = {
-    if (isStreaming) {
-      streamEnv
-    } else {
-      batchEnv
-    }
-  }
-
   def execJob(name: String) = {
-    if (isStreaming) {
-      streamExec.execute(name)
-    } else {
-      batchExec.execute(name)
-    }
+    tableEnv.execute(name)
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala
new file mode 100644
index 0000000..7f5990a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource}
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
+import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
+import org.apache.flink.table.factories.utils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, StreamTableSinkFactory, StreamTableSourceFactory}
+import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
+import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, StreamTableSink, TableSink}
+import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource, TableSource}
+import org.apache.flink.types.Row
+
+import java.io.IOException
+import java.util
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+class TestCollectionTableFactory
+  extends StreamTableSourceFactory[Row]
+    with StreamTableSinkFactory[Row]
+    with BatchTableSourceFactory[Row]
+    with BatchTableSinkFactory[Row]
+{
+
+  override def createTableSource(properties: JMap[String, String]): TableSource[Row] = {
+    getCollectionSource(properties, isStreaming = TestCollectionTableFactory.isStreaming)
+  }
+
+  override def createTableSink(properties: JMap[String, String]): TableSink[Row] = {
+    getCollectionSink(properties)
+  }
+
+  override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[Row] = {
+    getCollectionSource(properties, isStreaming = true)
+  }
+
+  override def createStreamTableSink(properties: JMap[String, String]): StreamTableSink[Row] = {
+    getCollectionSink(properties)
+  }
+
+  override def createBatchTableSource(properties: JMap[String, String]): BatchTableSource[Row] = {
+    getCollectionSource(properties, isStreaming = false)
+  }
+
+  override def createBatchTableSink(properties: JMap[String, String]): BatchTableSink[Row] = {
+    getCollectionSink(properties)
+  }
+
+  override def requiredContext(): JMap[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR, "COLLECTION")
+    context
+  }
+
+  override def supportedProperties(): JList[String] = {
+    val supported = new JArrayList[String]()
+    supported.add("*")
+    supported
+  }
+}
+
+object TestCollectionTableFactory {
+  var isStreaming: Boolean = true
+
+  val SOURCE_DATA = new JLinkedList[Row]()
+  val DIM_DATA = new JLinkedList[Row]()
+  val RESULT = new JLinkedList[Row]()
+  private var emitIntervalMS = -1L
+
+  def initData(sourceData: JList[Row],
+    dimData: JList[Row] = List(),
+    emitInterval: Long = -1L): Unit ={
+    SOURCE_DATA.addAll(sourceData)
+    DIM_DATA.addAll(dimData)
+    emitIntervalMS = emitInterval
+  }
+
+  def reset(): Unit ={
+    RESULT.clear()
+    SOURCE_DATA.clear()
+    DIM_DATA.clear()
+    emitIntervalMS = -1L
+  }
+
+  def getCollectionSource(props: JMap[String, String],
+    isStreaming: Boolean): CollectionTableSource = {
+    val properties = new DescriptorProperties()
+    properties.putProperties(props)
+    val schema = properties.getTableSchema(Schema.SCHEMA)
+    new CollectionTableSource(emitIntervalMS, schema, isStreaming)
+  }
+
+  def getCollectionSink(props: JMap[String, String]): CollectionTableSink = {
+    val properties = new DescriptorProperties()
+    properties.putProperties(props)
+    val schema = properties.getTableSchema(Schema.SCHEMA)
+    new CollectionTableSink(schema.toRowType.asInstanceOf[RowTypeInfo])
+  }
+
+  /**
+    * Table source of collection.
+    */
+  class CollectionTableSource(
+    val emitIntervalMs: Long,
+    val schema: TableSchema,
+    val isStreaming: Boolean)
+    extends BatchTableSource[Row]
+      with StreamTableSource[Row]
+      with LookupableTableSource[Row] {
+
+    private val rowType: TypeInformation[Row] = schema.toRowType
+
+    override def isBounded: Boolean = !isStreaming
+
+    def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+      execEnv.createInput(new TestCollectionInputFormat[Row](emitIntervalMs,
+        SOURCE_DATA,
+        rowType.createSerializer(new ExecutionConfig)),
+        rowType)
+    }
+
+    override def getDataStream(streamEnv: StreamExecutionEnvironment): DataStreamSource[Row] = {
+      streamEnv.createInput(new TestCollectionInputFormat[Row](emitIntervalMs,
+        SOURCE_DATA,
+        rowType.createSerializer(new ExecutionConfig)),
+        rowType)
+    }
+
+    override def getReturnType: TypeInformation[Row] = rowType
+
+    override def getTableSchema: TableSchema = {
+      schema
+    }
+
+    override def getLookupFunction(lookupKeys: Array[String]): TemporalTableFetcher = {
+      new TemporalTableFetcher(DIM_DATA, lookupKeys.map(schema.getFieldNames.indexOf(_)))
+    }
+
+    override def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[Row] = null
+
+    override def isAsyncEnabled: Boolean = false
+  }
+
+  /**
+    * Table sink of collection.
+    */
+  class CollectionTableSink(val outputType: RowTypeInfo)
+    extends BatchTableSink[Row]
+      with AppendStreamTableSink[Row] {
+    override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+      dataSet.output(new LocalCollectionOutputFormat[Row](RESULT)).setParallelism(1)
+    }
+
+    override def getOutputType: RowTypeInfo = outputType
+
+    override def getFieldNames: Array[String] = outputType.getFieldNames
+
+    override def getFieldTypes: Array[TypeInformation[_]] = {
+      outputType.getFieldTypes
+    }
+
+    override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+      dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1)
+    }
+
+    override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = {
+      dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1)
+    }
+
+    override def configure(fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+  }
+
+  /**
+    * Sink function of unsafe memory.
+    */
+  class UnsafeMemorySinkFunction(outputType: TypeInformation[Row]) extends RichSinkFunction[Row] {
+    private var serializer: TypeSerializer[Row] = _
+
+    override def open(param: Configuration): Unit = {
+      serializer = outputType.createSerializer(new ExecutionConfig)
+    }
+
+    @throws[Exception]
+    override def invoke(row: Row): Unit = {
+      RESULT.add(serializer.copy(row))
+    }
+  }
+
+  /**
+    * Collection inputFormat for testing.
+    */
+  class TestCollectionInputFormat[T](
+    val emitIntervalMs: Long,
+    val dataSet: java.util.Collection[T],
+    val serializer: TypeSerializer[T])
+    extends CollectionInputFormat[T](dataSet, serializer) {
+    @throws[IOException]
+    override def reachedEnd: Boolean = {
+      if (emitIntervalMs > 0) {
+        try
+          Thread.sleep(emitIntervalMs)
+        catch {
+          case _: InterruptedException =>
+        }
+      }
+      super.reachedEnd
+    }
+  }
+
+  /**
+    * Dimension table source fetcher.
+    */
+  class TemporalTableFetcher(
+    val dimData: JLinkedList[Row],
+    val keys: Array[Int]) extends TableFunction[Row] {
+
+    @throws[Exception]
+    def eval(values: Any*): Unit = {
+      for (data <- dimData) {
+        var matched = true
+        var idx = 0
+        while (matched && idx < keys.length) {
+          val dimField = data.getField(keys(idx))
+          val inputField = values(idx)
+          matched = dimField.equals(inputField)
+          idx += 1
+        }
+        if (matched) {
+          // copy the row data
+          val ret = new Row(data.getArity)
+          0 until data.getArity foreach { idx =>
+            ret.setField(idx, data.getField(idx))
+          }
+          collect(ret)
+        }
+      }
+    }
+  }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index d07cf16..27718ae 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -81,18 +81,21 @@ public class SqlToOperationConverter {
 		final SqlNode validated = flinkPlanner.validate(sqlNode);
 		SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner);
 		if (validated instanceof SqlCreateTable) {
-			return converter.convert((SqlCreateTable) validated);
-		} else if (validated instanceof SqlDropTable){
-			return converter.convert((SqlDropTable) validated);
+			return converter.convertCreateTable((SqlCreateTable) validated);
+		} if (validated instanceof SqlDropTable) {
+			return converter.convertDropTable((SqlDropTable) validated);
+		} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
+			return converter.convertSqlQuery(validated);
 		} else {
-			return converter.convert(validated);
+			throw new TableException("Unsupported node type "
+				+ validated.getClass().getSimpleName());
 		}
 	}
 
 	/**
 	 * Convert the {@link SqlCreateTable} node.
 	 */
-	private Operation convert(SqlCreateTable sqlCreateTable) {
+	private Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
 		// primary key and unique keys are not supported
 		if ((sqlCreateTable.getPrimaryKeyList() != null
 				&& sqlCreateTable.getPrimaryKeyList().size() > 0)
@@ -135,18 +138,13 @@ public class SqlToOperationConverter {
 	}
 
 	/** Convert DROP TABLE statement. */
-	private Operation convert(SqlDropTable sqlDropTable) {
+	private Operation convertDropTable(SqlDropTable sqlDropTable) {
 		return new DropTableOperation(sqlDropTable.fullTableName(), sqlDropTable.getIfExists());
 	}
 
 	/** Fallback method for sql query. */
-	private Operation convert(SqlNode node) {
-		if (node.getKind().belongsTo(SqlKind.QUERY)) {
-			return toQueryOperation(flinkPlanner, node);
-		} else {
-			throw new TableException("Should not invoke to node type "
-				+ node.getClass().getSimpleName());
-		}
+	private Operation convertSqlQuery(SqlNode node) {
+		return toQueryOperation(flinkPlanner, node);
 	}
 
 	//~ Tools ------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 03ef425..3bdfa56 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -477,7 +477,8 @@ abstract class TableEnvImpl(
         }
       case _ =>
         throw new TableException(
-          "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
+          "Unsupported SQL query! sqlUpdate() only accepts SQL statements of " +
+            "type INSERT, CREATE TABLE, DROP TABLE.")
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index d8e5ed2..e5f8a76 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.catalog
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
 import org.apache.flink.table.factories.utils.TestCollectionTableFactory
 import org.apache.flink.types.Row