You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/12 05:25:54 UTC
[flink] 02/02: [FLINK-13220][table-planner-blink] Add DDL support
for blink planner
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit db488c0d19dbb96466b9b765cecc06784b844926
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Thu Jul 11 21:39:47 2019 +0800
[FLINK-13220][table-planner-blink] Add DDL support for blink planner
This closes #9093
---
.../table/operations/ddl/DropTableOperation.java | 1 +
flink-table/flink-table-planner-blink/pom.xml | 15 ++
.../flink/table/catalog/DatabaseCalciteSchema.java | 51 +++-
.../apache/flink/table/planner/PlannerContext.java | 4 +
.../table/sqlexec/SqlConversionException.java | 35 +++
.../table/sqlexec/SqlToOperationConverter.java | 31 ++-
.../flink/table/calcite/FlinkPlannerImpl.scala | 23 +-
.../flink/table/calcite/PreValidateReWriter.scala | 201 +++++++++++++++
.../apache/flink/table/planner/PlannerBase.scala | 39 +--
.../org.apache.flink.table.factories.TableFactory | 16 ++
.../flink/table/catalog/CatalogTableITCase.scala | 45 ++--
.../utils/TestCollectionTableFactory.scala | 269 +++++++++++++++++++++
.../table/sqlexec/SqlToOperationConverter.java | 24 +-
.../flink/table/api/internal/TableEnvImpl.scala | 3 +-
.../flink/table/catalog/CatalogTableITCase.scala | 2 +-
15 files changed, 669 insertions(+), 90 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
index e173e02..c9f67db 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
@@ -48,6 +48,7 @@ public class DropTableOperation implements DropOperation {
@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
+ params.put("tableName", tableName);
params.put("IfExists", ifExists);
return OperationUtils.formatWithChildren(
diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml
index 9f5946a..dc56f44 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -114,6 +114,18 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-parser</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
@@ -323,6 +335,9 @@ under the License.
<include>com.fasterxml.jackson.core:jackson-annotations</include>
<include>commons-codec:commons-codec</include>
+ <!-- flink-table-planner-blink dependencies -->
+ <include>org.apache.flink.sql.parser:*</include>
+
<!-- flink-table-runtime-blink dependencies -->
<include>org.codehaus.janino:*</include>
</includes>
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 8b8b0861..0ed45ba 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -22,11 +22,16 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.factories.TableFactoryUtil;
+import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.operations.DataStreamQueryOperation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.RichTableSourceQueryOperation;
import org.apache.flink.table.plan.schema.TableSourceTable;
import org.apache.flink.table.plan.stats.FlinkStatistic;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.schema.Schema;
@@ -37,6 +42,7 @@ import org.apache.calcite.schema.Table;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import static java.lang.String.format;
@@ -80,13 +86,9 @@ class DatabaseCalciteSchema extends FlinkSchema {
}
return QueryOperationCatalogViewTable.createCalciteTable(view);
} else if (table instanceof ConnectorCatalogTable) {
- ConnectorCatalogTable<?, ?> connectorTable = (ConnectorCatalogTable<?, ?>) table;
- return connectorTable.getTableSource()
- .map(tableSource -> new TableSourceTable<>(
- tableSource,
- !connectorTable.isBatch(),
- FlinkStatistic.UNKNOWN())
- ).orElseThrow(() -> new TableException("Cannot query a sink only table."));
+ return convertConnectorTable((ConnectorCatalogTable<?, ?>) table);
+ } else if (table instanceof CatalogTable) {
+ return convertCatalogTable(tablePath, (CatalogTable) table);
} else {
throw new TableException("Unsupported table type: " + table);
}
@@ -101,6 +103,41 @@ class DatabaseCalciteSchema extends FlinkSchema {
}
}
+ private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
+ return table.getTableSource()
+ .map(tableSource -> new TableSourceTable<>(
+ tableSource,
+ !table.isBatch(),
+ FlinkStatistic.UNKNOWN()))
+ .orElseThrow(() -> new TableException("Cannot query a sink only table."));
+ }
+
+ private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
+ TableSource<?> tableSource;
+ Optional<TableFactory> tableFactory = catalog.getTableFactory();
+ if (tableFactory.isPresent()) {
+ TableFactory tf = tableFactory.get();
+ if (tf instanceof TableSourceFactory) {
+ tableSource = ((TableSourceFactory) tf).createTableSource(tablePath, table);
+ } else {
+ throw new TableException(String.format("Cannot query a sink-only table. TableFactory provided by catalog %s must implement TableSourceFactory",
+ catalog.getClass()));
+ }
+ } else {
+ tableSource = TableFactoryUtil.findAndCreateTableSource(table);
+ }
+
+ if (!(tableSource instanceof StreamTableSource)) {
+ throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
+ }
+
+ return new TableSourceTable<>(
+ tableSource,
+ !((StreamTableSource<?>) tableSource).isBounded(),
+ FlinkStatistic.UNKNOWN()
+ );
+ }
+
@Override
public Set<String> getTableNames() {
try {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java
index c323ea0..1ed59a9 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.planner;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.calcite.CalciteConfig;
import org.apache.flink.table.calcite.CalciteConfig$;
@@ -185,6 +187,8 @@ public class PlannerContext {
// and cases are preserved
() -> SqlParser
.configBuilder()
+ .setParserFactory(FlinkSqlParserImpl.FACTORY)
+ .setConformance(FlinkSqlConformance.DEFAULT)
.setLex(Lex.JAVA)
.setIdentifierMaxLength(256)
.build());
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlConversionException.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlConversionException.java
new file mode 100644
index 0000000..c49a31d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlConversionException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+/**
+ * Exception thrown during the execution of SQL statements.
+ */
+public class SqlConversionException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public SqlConversionException(String message) {
+ super(message);
+ }
+
+ public SqlConversionException(String message, Throwable e) {
+ super(message, e);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
similarity index 88%
copy from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
copy to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index d07cf16..ec36796 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -33,7 +33,7 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.PlannerQueryOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
-import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.types.LogicalTypeDataTypeConverter;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
@@ -73,7 +73,7 @@ public class SqlToOperationConverter {
* SqlNode will have it's implementation in the #convert(type) method whose 'type' argument
* is subclass of {@code SqlNode}.
*
- * @param flinkPlanner FlinkPlannerImpl to convert sql node to rel node
+ * @param flinkPlanner FlinkPlannerImpl to convertCreateTable sql node to rel node
* @param sqlNode SqlNode to execute on
*/
public static Operation convert(FlinkPlannerImpl flinkPlanner, SqlNode sqlNode) {
@@ -81,18 +81,21 @@ public class SqlToOperationConverter {
final SqlNode validated = flinkPlanner.validate(sqlNode);
SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner);
if (validated instanceof SqlCreateTable) {
- return converter.convert((SqlCreateTable) validated);
- } else if (validated instanceof SqlDropTable){
- return converter.convert((SqlDropTable) validated);
+ return converter.convertCreateTable((SqlCreateTable) validated);
+ } if (validated instanceof SqlDropTable) {
+ return converter.convertDropTable((SqlDropTable) validated);
+ } else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
+ return converter.convertSqlQuery(validated);
} else {
- return converter.convert(validated);
+ throw new TableException("Unsupported node type "
+ + validated.getClass().getSimpleName());
}
}
/**
* Convert the {@link SqlCreateTable} node.
*/
- private Operation convert(SqlCreateTable sqlCreateTable) {
+ private Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
// primary key and unique keys are not supported
if ((sqlCreateTable.getPrimaryKeyList() != null
&& sqlCreateTable.getPrimaryKeyList().size() > 0)
@@ -135,18 +138,13 @@ public class SqlToOperationConverter {
}
/** Convert DROP TABLE statement. */
- private Operation convert(SqlDropTable sqlDropTable) {
+ private Operation convertDropTable(SqlDropTable sqlDropTable) {
return new DropTableOperation(sqlDropTable.fullTableName(), sqlDropTable.getIfExists());
}
/** Fallback method for sql query. */
- private Operation convert(SqlNode node) {
- if (node.getKind().belongsTo(SqlKind.QUERY)) {
- return toQueryOperation(flinkPlanner, node);
- } else {
- throw new TableException("Should not invoke to node type "
- + node.getClass().getSimpleName());
- }
+ private Operation convertSqlQuery(SqlNode node) {
+ return toQueryOperation(flinkPlanner, node);
}
//~ Tools ------------------------------------------------------------------
@@ -184,7 +182,8 @@ public class SqlToOperationConverter {
final RelDataType relType = column.getType().deriveType(factory,
column.getType().getNullable());
builder.field(column.getName().getSimple(),
- TypeConversions.fromLegacyInfoToDataType(FlinkTypeFactory.toTypeInfo(relType)));
+ LogicalTypeDataTypeConverter.fromLogicalTypeToDataType(
+ FlinkTypeFactory.toLogicalType(relType)));
physicalSchema = builder.build();
}
assert physicalSchema != null;
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index c3490f9..c0b0a90 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.calcite
+import org.apache.flink.sql.parser.ExtendedSqlNode
import org.apache.flink.table.api.{SqlParserException, TableException, ValidationException}
import com.google.common.collect.ImmutableList
@@ -30,7 +31,7 @@ import org.apache.calcite.rel.{RelFieldCollation, RelRoot}
import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
import org.apache.calcite.sql.parser.{SqlParser, SqlParseException => CSqlParseException}
import org.apache.calcite.sql.validate.SqlValidator
-import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter}
import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
@@ -97,16 +98,28 @@ class FlinkPlannerImpl(
}
def validate(sqlNode: SqlNode): SqlNode = {
+ val catalogReader = catalogReaderSupplier.apply(false)
+ // do pre-validate rewrite.
+ sqlNode.accept(new PreValidateReWriter(catalogReader, typeFactory))
+ // do extended validation.
+ sqlNode match {
+ case node: ExtendedSqlNode =>
+ node.validate()
+ case _ =>
+ }
+ // no need to validate row type for DDL nodes.
+ if (sqlNode.getKind.belongsTo(SqlKind.DDL)) {
+ return sqlNode
+ }
validator = new FlinkCalciteSqlValidator(
operatorTable,
- catalogReaderSupplier(false), typeFactory)
+ catalogReader,
+ typeFactory)
validator.setIdentifierExpansion(true)
validator.setDefaultNullCollation(FlinkPlannerImpl.defaultNullCollation)
-
try {
validator.validate(sqlNode)
- }
- catch {
+ } catch {
case e: RuntimeException =>
throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
new file mode 100644
index 0000000..011c85d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/PreValidateReWriter.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.flink.sql.parser.SqlProperty
+import org.apache.flink.sql.parser.dml.RichSqlInsert
+import org.apache.flink.table.calcite.PreValidateReWriter.appendPartitionProjects
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, RelDataTypeField}
+import org.apache.calcite.runtime.{CalciteContextException, Resources}
+import org.apache.calcite.sql.`type`.SqlTypeUtil
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.util.SqlBasicVisitor
+import org.apache.calcite.sql.validate.{SqlValidatorException, SqlValidatorTable, SqlValidatorUtil}
+import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlLiteral, SqlNode, SqlNodeList, SqlSelect, SqlUtil}
+import org.apache.calcite.util.Static.RESOURCE
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/** Implements [[org.apache.calcite.sql.util.SqlVisitor]]
+ * interface to do some rewrite work before sql node validation. */
+class PreValidateReWriter(
+ val catalogReader: CalciteCatalogReader,
+ val typeFactory: RelDataTypeFactory) extends SqlBasicVisitor[Unit] {
+ override def visit(call: SqlCall): Unit = {
+ call match {
+ case r: RichSqlInsert if r.getStaticPartitions.nonEmpty
+ && r.getSource.isInstanceOf[SqlSelect] =>
+ appendPartitionProjects(r, catalogReader, typeFactory,
+ r.getSource.asInstanceOf[SqlSelect], r.getStaticPartitions)
+ case _ =>
+ }
+ }
+}
+
+object PreValidateReWriter {
+ //~ Tools ------------------------------------------------------------------
+ /**
+ * Append the static partitions to the data source projection list. The columns are appended to
+ * the corresponding positions.
+ *
+ * <p>If we have a table A with schema (<a>, <b>, <c>) whose
+ * partition columns are (<a>, <c>), and got a query
+ * <blockquote><pre>
+ * insert into A partition(a='11', c='22')
+ * select b from B
+ * </pre></blockquote>
+ * The query would be rewritten to:
+ * <blockquote><pre>
+ * insert into A partition(a='11', c='22')
+ * select cast('11' as tpe1), b, cast('22' as tpe2) from B
+ * </pre></blockquote>
+ * Where the "tpe1" and "tpe2" are data types of column a and c of target table A.
+ *
+ * @param sqlInsert RichSqlInsert instance
+ * @param calciteCatalogReader catalog reader
+ * @param typeFactory type factory
+ * @param select Source sql select
+ * @param partitions Static partition statements
+ */
+ def appendPartitionProjects(sqlInsert: RichSqlInsert,
+ calciteCatalogReader: CalciteCatalogReader,
+ typeFactory: RelDataTypeFactory,
+ select: SqlSelect,
+ partitions: SqlNodeList): Unit = {
+ val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
+ val table = calciteCatalogReader.getTable(names)
+ if (table == null) {
+ // There is no table exists in current catalog,
+ // just skip to let other validation error throw.
+ return
+ }
+ val targetRowType = createTargetRowType(typeFactory,
+ calciteCatalogReader, table, sqlInsert.getTargetColumnList)
+ // validate partition fields first.
+ val assignedFields = new util.LinkedHashMap[Integer, SqlNode]
+ val relOptTable = table match {
+ case t: RelOptTable => t
+ case _ => null
+ }
+ for (node <- partitions.getList) {
+ val sqlProperty = node.asInstanceOf[SqlProperty]
+ val id = sqlProperty.getKey
+ val targetField = SqlValidatorUtil.getTargetField(targetRowType,
+ typeFactory, id, calciteCatalogReader, relOptTable)
+ validateField(assignedFields.containsValue, id, targetField)
+ val value = sqlProperty.getValue.asInstanceOf[SqlLiteral]
+ assignedFields.put(targetField.getIndex,
+ maybeCast(value, value.createSqlType(typeFactory), targetField.getType, typeFactory))
+ }
+ val currentNodes = new util.ArrayList[SqlNode](select.getSelectList.getList)
+ val fixedNodes = new util.ArrayList[SqlNode]
+ 0 until targetRowType.getFieldList.length foreach {
+ idx =>
+ if (assignedFields.containsKey(idx)) {
+ fixedNodes.add(assignedFields.get(idx))
+ } else if (currentNodes.size() > 0) {
+ fixedNodes.add(currentNodes.remove(0))
+ }
+ }
+ // Although it is error case, we still append the old remaining
+ // projection nodes to new projection.
+ if (currentNodes.size > 0) {
+ fixedNodes.addAll(currentNodes)
+ }
+ select.setSelectList(new SqlNodeList(fixedNodes, select.getSelectList.getParserPosition))
+ }
+
+ /**
+ * Derives a row-type for INSERT and UPDATE operations.
+ *
+ * <p>This code snippet is almost inspired by
+ * [[org.apache.calcite.sql.validate.SqlValidatorImpl#createTargetRowType]].
+ * It is the best that the logic can be merged into Apache Calcite,
+ * but this needs time.
+ *
+ * @param typeFactory TypeFactory
+ * @param catalogReader CalciteCatalogReader
+ * @param table Target table for INSERT/UPDATE
+ * @param targetColumnList List of target columns, or null if not specified
+ * @return Rowtype
+ */
+ private def createTargetRowType(
+ typeFactory: RelDataTypeFactory,
+ catalogReader: CalciteCatalogReader,
+ table: SqlValidatorTable,
+ targetColumnList: SqlNodeList): RelDataType = {
+ val baseRowType = table.getRowType
+ if (targetColumnList == null) return baseRowType
+ val fields = new util.ArrayList[util.Map.Entry[String, RelDataType]]
+ val assignedFields = new util.HashSet[Integer]
+ val relOptTable = table match {
+ case t: RelOptTable => t
+ case _ => null
+ }
+ for (node <- targetColumnList) {
+ val id = node.asInstanceOf[SqlIdentifier]
+ val targetField = SqlValidatorUtil.getTargetField(baseRowType,
+ typeFactory, id, catalogReader, relOptTable)
+ validateField(assignedFields.add, id, targetField)
+ fields.add(targetField)
+ }
+ typeFactory.createStructType(fields)
+ }
+
+ /** Check whether the field is valid. **/
+ private def validateField(tester: Function[Integer, Boolean],
+ id: SqlIdentifier,
+ targetField: RelDataTypeField): Unit = {
+ if (targetField == null) {
+ throw newValidationError(id, RESOURCE.unknownTargetColumn(id.toString))
+ }
+ if (!tester.apply(targetField.getIndex)) {
+ throw newValidationError(id, RESOURCE.duplicateTargetColumn(targetField.getName))
+ }
+ }
+
+ private def newValidationError(node: SqlNode,
+ e: Resources.ExInst[SqlValidatorException]): CalciteContextException = {
+ assert(node != null)
+ val pos = node.getParserPosition
+ SqlUtil.newContextException(pos, e)
+ }
+
+ // This code snippet is copied from the SqlValidatorImpl.
+ private def maybeCast(node: SqlNode,
+ currentType: RelDataType,
+ desiredType: RelDataType,
+ typeFactory: RelDataTypeFactory): SqlNode = {
+ if (currentType == desiredType
+ || (currentType.isNullable != desiredType.isNullable
+ && typeFactory.createTypeWithNullability(currentType, desiredType.isNullable)
+ == desiredType)) {
+ node
+ } else {
+ SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO,
+ node, SqlTypeUtil.convertTypeToSpec(desiredType))
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
index ef17933..00af797 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
@@ -37,6 +37,7 @@ import org.apache.flink.table.plan.optimize.Optimizer
import org.apache.flink.table.plan.reuse.SubplanReuser
import org.apache.flink.table.plan.util.SameRelObjectShuttle
import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink, TableSinkUtils}
+import org.apache.flink.table.sqlexec.SqlToOperationConverter
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
import org.apache.flink.table.util.JavaScalaConversionUtil
@@ -114,24 +115,26 @@ abstract class PlannerBase(
val planner = getFlinkPlanner
// parse the sql query
val parsed = planner.parse(stmt)
- if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
- // validate the sql query
- val validated = planner.validate(parsed)
- // transform to a relational tree
- val relational = planner.rel(validated)
- List(new PlannerQueryOperation(relational.project()))
- } else if (null != parsed && parsed.isInstanceOf[SqlInsert]) {
- val insert = parsed.asInstanceOf[SqlInsert]
- // validate the SQL query
- val query = insert.getSource
- val validatedQuery = planner.validate(query)
- val sinkOperation = new CatalogSinkModifyOperation(
- insert.getTargetTable.asInstanceOf[SqlIdentifier].names,
- new PlannerQueryOperation(planner.rel(validatedQuery).rel)
- )
- List(sinkOperation)
- } else {
- throw new TableException(s"Unsupported query: $stmt")
+ parsed match {
+ case insert: SqlInsert =>
+ // get name of sink table
+ val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
+
+ List(new CatalogSinkModifyOperation(
+ targetTablePath,
+ SqlToOperationConverter.convert(planner,
+ insert.getSource).asInstanceOf[PlannerQueryOperation]).asInstanceOf[Operation])
+ case query if query.getKind.belongsTo(SqlKind.QUERY) =>
+ // validate the sql query
+ val validated = planner.validate(query)
+ // transform to a relational tree
+ val relational = planner.rel(validated)
+ // can not use SqlToOperationConverter because of the project()
+ List(new PlannerQueryOperation(relational.project()))
+ case ddl if ddl.getKind.belongsTo(SqlKind.DDL) =>
+ List(SqlToOperationConverter.convert(planner, ddl))
+ case _ =>
+ throw new TableException(s"Unsupported query: $stmt")
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..45736bf
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.factories.utils.TestCollectionTableFactory
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
similarity index 92%
copy from flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
copy to flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index d8e5ed2..e06ccb2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -18,10 +18,8 @@
package org.apache.flink.table.catalog
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
+import org.apache.flink.table.api.internal.TableEnvironmentImpl
+import org.apache.flink.table.api.{EnvironmentSettings, ExecutionConfigOptions, TableEnvironment, TableException}
import org.apache.flink.table.factories.utils.TestCollectionTableFactory
import org.apache.flink.types.Row
@@ -38,16 +36,18 @@ import scala.collection.JavaConversions._
@RunWith(classOf[Parameterized])
class CatalogTableITCase(isStreaming: Boolean) {
- private val batchExec: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- private var batchEnv: BatchTableEnvironment = _
- private val streamExec: StreamExecutionEnvironment = StreamExecutionEnvironment
- .getExecutionEnvironment
- private var streamEnv: StreamTableEnvironment = _
+ private val settings = if (isStreaming) {
+ EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
+ } else {
+ EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
+ }
+
+ private val tableEnv: TableEnvironment = TableEnvironmentImpl.create(settings)
private val SOURCE_DATA = List(
- toRow(1, "a"),
- toRow(2, "b"),
- toRow(3, "c")
+ toRow(1, "a"),
+ toRow(2, "b"),
+ toRow(3, "c")
)
private val DIM_DATA = List(
@@ -64,10 +64,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
@Before
def before(): Unit = {
- batchExec.setParallelism(4)
- streamExec.setParallelism(4)
- batchEnv = BatchTableEnvironment.create(batchExec)
- streamEnv = StreamTableEnvironment.create(streamExec)
+ tableEnv.getConfig
+ .getConfiguration
+ .setInteger(ExecutionConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 1)
TestCollectionTableFactory.reset()
TestCollectionTableFactory.isStreaming = isStreaming
}
@@ -80,20 +79,8 @@ class CatalogTableITCase(isStreaming: Boolean) {
row
}
- def tableEnv: TableEnvironment = {
- if (isStreaming) {
- streamEnv
- } else {
- batchEnv
- }
- }
-
def execJob(name: String) = {
- if (isStreaming) {
- streamExec.execute(name)
- } else {
- batchExec.execute(name)
- }
+ tableEnv.execute(name)
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala
new file mode 100644
index 0000000..7f5990a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource}
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
+import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
+import org.apache.flink.table.factories.utils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, StreamTableSinkFactory, StreamTableSourceFactory}
+import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
+import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, StreamTableSink, TableSink}
+import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource, TableSource}
+import org.apache.flink.types.Row
+
+import java.io.IOException
+import java.util
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+class TestCollectionTableFactory
+ extends StreamTableSourceFactory[Row]
+ with StreamTableSinkFactory[Row]
+ with BatchTableSourceFactory[Row]
+ with BatchTableSinkFactory[Row]
+{
+
+ override def createTableSource(properties: JMap[String, String]): TableSource[Row] = {
+ getCollectionSource(properties, isStreaming = TestCollectionTableFactory.isStreaming)
+ }
+
+ override def createTableSink(properties: JMap[String, String]): TableSink[Row] = {
+ getCollectionSink(properties)
+ }
+
+ override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[Row] = {
+ getCollectionSource(properties, isStreaming = true)
+ }
+
+ override def createStreamTableSink(properties: JMap[String, String]): StreamTableSink[Row] = {
+ getCollectionSink(properties)
+ }
+
+ override def createBatchTableSource(properties: JMap[String, String]): BatchTableSource[Row] = {
+ getCollectionSource(properties, isStreaming = false)
+ }
+
+ override def createBatchTableSink(properties: JMap[String, String]): BatchTableSink[Row] = {
+ getCollectionSink(properties)
+ }
+
+ override def requiredContext(): JMap[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(CONNECTOR, "COLLECTION")
+ context
+ }
+
+ override def supportedProperties(): JList[String] = {
+ val supported = new JArrayList[String]()
+ supported.add("*")
+ supported
+ }
+}
+
+object TestCollectionTableFactory {
+ var isStreaming: Boolean = true
+
+ val SOURCE_DATA = new JLinkedList[Row]()
+ val DIM_DATA = new JLinkedList[Row]()
+ val RESULT = new JLinkedList[Row]()
+ private var emitIntervalMS = -1L
+
+ def initData(sourceData: JList[Row],
+ dimData: JList[Row] = List(),
+ emitInterval: Long = -1L): Unit ={
+ SOURCE_DATA.addAll(sourceData)
+ DIM_DATA.addAll(dimData)
+ emitIntervalMS = emitInterval
+ }
+
+ def reset(): Unit ={
+ RESULT.clear()
+ SOURCE_DATA.clear()
+ DIM_DATA.clear()
+ emitIntervalMS = -1L
+ }
+
+ def getCollectionSource(props: JMap[String, String],
+ isStreaming: Boolean): CollectionTableSource = {
+ val properties = new DescriptorProperties()
+ properties.putProperties(props)
+ val schema = properties.getTableSchema(Schema.SCHEMA)
+ new CollectionTableSource(emitIntervalMS, schema, isStreaming)
+ }
+
+ def getCollectionSink(props: JMap[String, String]): CollectionTableSink = {
+ val properties = new DescriptorProperties()
+ properties.putProperties(props)
+ val schema = properties.getTableSchema(Schema.SCHEMA)
+ new CollectionTableSink(schema.toRowType.asInstanceOf[RowTypeInfo])
+ }
+
+ /**
+ * Table source of collection.
+ */
+ class CollectionTableSource(
+ val emitIntervalMs: Long,
+ val schema: TableSchema,
+ val isStreaming: Boolean)
+ extends BatchTableSource[Row]
+ with StreamTableSource[Row]
+ with LookupableTableSource[Row] {
+
+ private val rowType: TypeInformation[Row] = schema.toRowType
+
+ override def isBounded: Boolean = !isStreaming
+
+ def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+ execEnv.createInput(new TestCollectionInputFormat[Row](emitIntervalMs,
+ SOURCE_DATA,
+ rowType.createSerializer(new ExecutionConfig)),
+ rowType)
+ }
+
+ override def getDataStream(streamEnv: StreamExecutionEnvironment): DataStreamSource[Row] = {
+ streamEnv.createInput(new TestCollectionInputFormat[Row](emitIntervalMs,
+ SOURCE_DATA,
+ rowType.createSerializer(new ExecutionConfig)),
+ rowType)
+ }
+
+ override def getReturnType: TypeInformation[Row] = rowType
+
+ override def getTableSchema: TableSchema = {
+ schema
+ }
+
+ override def getLookupFunction(lookupKeys: Array[String]): TemporalTableFetcher = {
+ new TemporalTableFetcher(DIM_DATA, lookupKeys.map(schema.getFieldNames.indexOf(_)))
+ }
+
+ override def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[Row] = null
+
+ override def isAsyncEnabled: Boolean = false
+ }
+
+ /**
+ * Table sink of collection.
+ */
+ class CollectionTableSink(val outputType: RowTypeInfo)
+ extends BatchTableSink[Row]
+ with AppendStreamTableSink[Row] {
+ override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+ dataSet.output(new LocalCollectionOutputFormat[Row](RESULT)).setParallelism(1)
+ }
+
+ override def getOutputType: RowTypeInfo = outputType
+
+ override def getFieldNames: Array[String] = outputType.getFieldNames
+
+ override def getFieldTypes: Array[TypeInformation[_]] = {
+ outputType.getFieldTypes
+ }
+
+ override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+ dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1)
+ }
+
+ override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = {
+ dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1)
+ }
+
+ override def configure(fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+ }
+
+ /**
+ * Sink function of unsafe memory.
+ */
+ class UnsafeMemorySinkFunction(outputType: TypeInformation[Row]) extends RichSinkFunction[Row] {
+ private var serializer: TypeSerializer[Row] = _
+
+ override def open(param: Configuration): Unit = {
+ serializer = outputType.createSerializer(new ExecutionConfig)
+ }
+
+ @throws[Exception]
+ override def invoke(row: Row): Unit = {
+ RESULT.add(serializer.copy(row))
+ }
+ }
+
+ /**
+ * Collection inputFormat for testing.
+ */
+ class TestCollectionInputFormat[T](
+ val emitIntervalMs: Long,
+ val dataSet: java.util.Collection[T],
+ val serializer: TypeSerializer[T])
+ extends CollectionInputFormat[T](dataSet, serializer) {
+ @throws[IOException]
+ override def reachedEnd: Boolean = {
+ if (emitIntervalMs > 0) {
+ try
+ Thread.sleep(emitIntervalMs)
+ catch {
+ case _: InterruptedException =>
+ }
+ }
+ super.reachedEnd
+ }
+ }
+
+ /**
+ * Dimension table source fetcher.
+ */
+ class TemporalTableFetcher(
+ val dimData: JLinkedList[Row],
+ val keys: Array[Int]) extends TableFunction[Row] {
+
+ @throws[Exception]
+ def eval(values: Any*): Unit = {
+ for (data <- dimData) {
+ var matched = true
+ var idx = 0
+ while (matched && idx < keys.length) {
+ val dimField = data.getField(keys(idx))
+ val inputField = values(idx)
+ matched = dimField.equals(inputField)
+ idx += 1
+ }
+ if (matched) {
+ // copy the row data
+ val ret = new Row(data.getArity)
+ 0 until data.getArity foreach { idx =>
+ ret.setField(idx, data.getField(idx))
+ }
+ collect(ret)
+ }
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index d07cf16..27718ae 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -81,18 +81,21 @@ public class SqlToOperationConverter {
final SqlNode validated = flinkPlanner.validate(sqlNode);
SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner);
if (validated instanceof SqlCreateTable) {
- return converter.convert((SqlCreateTable) validated);
- } else if (validated instanceof SqlDropTable){
- return converter.convert((SqlDropTable) validated);
+ return converter.convertCreateTable((SqlCreateTable) validated);
+ } if (validated instanceof SqlDropTable) {
+ return converter.convertDropTable((SqlDropTable) validated);
+ } else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
+ return converter.convertSqlQuery(validated);
} else {
- return converter.convert(validated);
+ throw new TableException("Unsupported node type "
+ + validated.getClass().getSimpleName());
}
}
/**
* Convert the {@link SqlCreateTable} node.
*/
- private Operation convert(SqlCreateTable sqlCreateTable) {
+ private Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
// primary key and unique keys are not supported
if ((sqlCreateTable.getPrimaryKeyList() != null
&& sqlCreateTable.getPrimaryKeyList().size() > 0)
@@ -135,18 +138,13 @@ public class SqlToOperationConverter {
}
/** Convert DROP TABLE statement. */
- private Operation convert(SqlDropTable sqlDropTable) {
+ private Operation convertDropTable(SqlDropTable sqlDropTable) {
return new DropTableOperation(sqlDropTable.fullTableName(), sqlDropTable.getIfExists());
}
/** Fallback method for sql query. */
- private Operation convert(SqlNode node) {
- if (node.getKind().belongsTo(SqlKind.QUERY)) {
- return toQueryOperation(flinkPlanner, node);
- } else {
- throw new TableException("Should not invoke to node type "
- + node.getClass().getSimpleName());
- }
+ private Operation convertSqlQuery(SqlNode node) {
+ return toQueryOperation(flinkPlanner, node);
}
//~ Tools ------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 03ef425..3bdfa56 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -477,7 +477,8 @@ abstract class TableEnvImpl(
}
case _ =>
throw new TableException(
- "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
+ "Unsupported SQL query! sqlUpdate() only accepts SQL statements of " +
+ "type INSERT, CREATE TABLE, DROP TABLE.")
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index d8e5ed2..e5f8a76 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.catalog
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
import org.apache.flink.table.factories.utils.TestCollectionTableFactory
import org.apache.flink.types.Row