You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/09 14:31:04 UTC

[flink] branch master updated: [FLINK-12951][table-planner] Add logic to bridge DDL with table source/sink

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 77e54cb  [FLINK-12951][table-planner] Add logic to bridge DDL with table source/sink
77e54cb is described below

commit 77e54cbf9e2f0ae2bfb59bb0ea54793606134c49
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Wed Jun 5 19:11:54 2019 +0800

    [FLINK-12951][table-planner] Add logic to bridge DDL with table source/sink
    
    This closes #8844
---
 flink-python/pyflink/table/table_environment.py    |  63 +++
 .../flink/sql/parser/ddl/SqlCreateTable.java       |   4 +
 .../apache/flink/table/api/TableEnvironment.java   |  71 ++++
 .../table/api/internal/TableEnvironmentImpl.java   |  78 +++-
 .../apache/flink/table/catalog/CatalogManager.java |  42 ++
 .../flink/table/catalog/CatalogTableImpl.java      |   5 +-
 .../table/operations/ddl/CreateOperation.java      |  34 ++
 .../table/operations/ddl/CreateTableOperation.java |  70 ++++
 .../flink/table/catalog/CatalogCalciteSchema.java  |   6 +-
 .../table/catalog/CatalogManagerCalciteSchema.java |   2 +-
 .../flink/table/catalog/DatabaseCalciteSchema.java |  10 +-
 .../table/sqlexec/SqlConversionException.java      |  35 ++
 .../table/sqlexec/SqlExecutableStatements.java     |  79 ----
 .../table/sqlexec/SqlToOperationConverter.java     | 193 +++++++++
 .../flink/table/api/internal/TableEnvImpl.scala    |  89 +++-
 .../flink/table/calcite/FlinkPlannerImpl.scala     |  42 +-
 .../apache/flink/table/planner/StreamPlanner.scala |  45 +--
 .../table/catalog/DatabaseCalciteSchemaTest.java   |   2 +-
 .../table/sqlexec/SqlToOperationConverterTest.java |  98 +++++
 .../org.apache.flink.table.factories.TableFactory  |   1 +
 .../flink/table/catalog/CatalogTableITCase.scala   | 450 +++++++++++++++++++++
 .../utils/TestCollectionTableFactory.scala         | 265 ++++++++++++
 .../flink/table/utils/MockTableEnvironment.scala   |   2 +
 23 files changed, 1537 insertions(+), 149 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 3c8b44c..4b248f9 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -235,6 +235,69 @@ class TableEnvironment(object):
         """
         return self._j_tenv.explain(table._j_table)
 
+    def sql(self, query):
+        """
+        Evaluates single sql statement including DDLs and DMLs.
+
+        Note: Always use this interface to execute a sql query. It only supports
+        to execute one sql statement a time.
+
+        A DDL statement can execute to create/drop a table/view:
+        For example, the below DDL statement would create a CSV table named `tbl1`
+        into the current catalog:
+
+        create table tbl1(
+            a int,
+            b bigint,
+            c varchar
+        ) with (
+            connector = 'csv',
+            csv.path = 'xxx'
+        )
+
+        The returns table format for different kind of statement:
+        DDL: returns null.
+        DML: a sql insert returns null; a sql query(select) returns a table
+        to describe the query data set, it can be further queried through the Table API,
+        or directly write to sink with `~Table.insert_into`.
+
+        SQL queries can directly execute as follows:
+        ::
+        >>> sinkDDL =
+            "create table sinkTable(
+                a int,
+                b varchar
+            ) with (
+                connector = 'csv',
+                csv.path = 'xxx'
+            )"
+
+        >>> sourceDDL =
+            "create table sourceTable(
+                a int,
+                b varchar
+            ) with (
+                connector = 'kafka',
+                kafka.topic = 'xxx',
+                kafka.endpoint = 'x.x.x'
+            )"
+
+        query = "INSERT INTO sinkTable SELECT FROM sourceTable"
+
+        tEnv.sql(sourceDDL)
+        tEnv.sql(sinkDDL)
+        tEnv.sql(query)
+        tEnv.execute("MyJob")
+
+        This code snippet creates a job to read data from Kafka source into a CSV sink.
+
+        :param query: The SQL statement to evaluate.
+        """
+        j_table = self._j_tenv.sql(query)
+        if j_table is None:
+            return None
+        return Table(j_table)
+
     def sql_query(self, query):
         """
         Evaluates a SQL query on registered tables and retrieves the result as a :class:`Table`.
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 84e4ca2..ec9e562 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -123,6 +123,10 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 		return comment;
 	}
 
+	public boolean isIfNotExists() {
+		return ifNotExists;
+	}
+
 	public void validate() throws SqlParseException {
 		Set<String> columnNames = new HashSet<>();
 		if (columnList != null) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 10505ac..535a6bc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 
+import javax.annotation.Nullable;
+
 import java.util.Optional;
 
 /**
@@ -321,6 +323,69 @@ public interface TableEnvironment {
 	String[] getCompletionHints(String statement, int position);
 
 	/**
+	 * Evaluates single sql statement including DDLs and DMLs.
+	 *
+	 * <p>Note: Always use this interface to execute a sql query. It only supports
+	 * to execute one sql statement a time.
+	 *
+	 * <p>A DDL statement can execute to create/drop a table/view:
+	 * For example, the below DDL statement would create a CSV table named `tbl1`
+	 * into the current catalog:
+	 * <blockquote><pre>
+	 *   create table tbl1(
+	 *     a int,
+	 *     b bigint,
+	 *     c varchar
+	 *   ) with (
+	 *     connector = 'csv',
+	 *     csv.path = 'xxx'
+	 *   )
+	 * </pre></blockquote>
+	 *
+	 * <p>The returns table format for different kind of statement:
+	 * <ul>
+	 *   <li>DDL: returns null.</li>
+	 *   <li>DML: a sql insert returns null; a sql query(select) returns
+	 *   a table to describe the query data set, it can be further queried through
+	 *   the Table API, or directly write to sink with
+	 *   {@link #insertInto(Table, String, String...)}.</li>
+	 * </ul>
+	 *
+	 * <p>SQL queries can directly execute as follows:
+	 *
+	 * <blockquote><pre>
+	 *   String sinkDDL = "create table sinkTable(
+	 *                       a int,
+	 *                       b varchar
+	 *                     ) with (
+	 *                       connector = 'csv',
+	 *                       csv.path = 'xxx'
+	 *                     )";
+	 *
+	 *  String sourceDDL ="create table sourceTable(
+	 *                       a int,
+	 *                       b varchar
+	 *                     ) with (
+	 *                       connector = 'kafka',
+	 *                       kafka.topic = 'xxx',
+	 *                       kafka.endpoint = 'x.x.x'
+	 *                     )";
+	 *
+	 *  String query = "INSERT INTO sinkTable SELECT * FROM sourceTable";
+	 *
+	 *  tEnv.sql(sourceDDL);
+	 *  tEnv.sql(sinkDDL);
+	 *  tEnv.sql(query);
+	 *  tEnv.execute("MyJob");
+	 * </pre></blockquote>
+	 * This code snippet creates a job to read data from Kafka source into a CSV sink.
+	 *
+	 * @param statement The SQL statement to evaluate.
+	 */
+	@Nullable
+	Table sql(String statement);
+
+	/**
 	 * Evaluates a SQL query on registered tables and retrieves the result as a {@link Table}.
 	 *
 	 * <p>All tables referenced by the query must be registered in the TableEnvironment.
@@ -337,9 +402,12 @@ public interface TableEnvironment {
 	 * }
 	 * </pre>
 	 *
+	 * @deprecated Use {@link #sql(String)}.
+	 *
 	 * @param query The SQL query to evaluate.
 	 * @return The result of the query as Table
 	 */
+	@Deprecated
 	Table sqlQuery(String query);
 
 	/**
@@ -362,8 +430,11 @@ public interface TableEnvironment {
 	 * }
 	 * </pre>
 	 *
+	 * @deprecated Use {@link #sql(String)}.
+	 *
 	 * @param stmt The SQL statement to evaluate.
 	 */
+	@Deprecated
 	void sqlUpdate(String stmt);
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 7d509b8..4817dcb 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -56,10 +56,12 @@ import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.TableSourceQueryOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.operations.utils.OperationTreeBuilder;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.ArrayList;
@@ -189,7 +191,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		}
 
 		CatalogBaseTable tableTable = new QueryOperationCatalogView(table.getQueryOperation());
-		registerTableInternal(name, tableTable);
+		registerTableInternal(new String[] { name }, tableTable, false);
 	}
 
 	@Override
@@ -291,6 +293,40 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	}
 
 	@Override
+	public Table sql(String statement) {
+		List<Operation> operations = planner.parse(statement);
+		Preconditions.checkState(operations.size() == 1,
+			"sql() only accepts a single SQL statement a time.");
+		Operation operation = operations.get(0);
+		if (operation instanceof CreateTableOperation) {
+			CreateTableOperation createTableOperation = (CreateTableOperation) operation;
+			registerTableInternal(
+				createTableOperation.getTablePath(),
+				createTableOperation.getCatalogTable(),
+				createTableOperation.isIgnoreIfExists());
+			// returns null for DDL statement now.
+			return null;
+		} else if (operation instanceof ModifyOperation) {
+			List<ModifyOperation> modifyOperations =
+				Collections.singletonList((ModifyOperation) operation);
+			if (isEagerOperationTranslation()) {
+				translate(modifyOperations);
+			} else {
+				buffer(modifyOperations);
+			}
+			// returns null for SQL INSERT statement now.
+			return null;
+		} else if (operation instanceof QueryOperation){
+			return createTable((QueryOperation) operation);
+		} else {
+			throw new ValidationException(
+				"Unsupported SQL query! sqlQuery() only accepts a single SQL query of type " +
+					"SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY; or DDL of type " +
+					"CREATE TABLE, CREATE VIEW; or DML of type INSERT INTO.");
+		}
+	}
+
+	@Override
 	public Table sqlQuery(String query) {
 		List<Operation> operations = planner.parse(query);
 
@@ -418,17 +454,31 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		bufferedModifyOperations.addAll(modifyOperations);
 	}
 
-	protected void registerTableInternal(String name, CatalogBaseTable table) {
+	/**
+	 * Registers a {@link CatalogBaseTable} under a given object path. The {@code path} could be
+	 * 3 formats:
+	 * <ol>
+	 *   <li>`catalog.db.table`: A full table path including the catalog name,
+	 *   the database name and table name.</li>
+	 *   <li>`db.table`: database name following table name, with the current catalog name.</li>
+	 *   <li>`table`: Only the table name, with the current catalog name and database  name.</li>
+	 * </ol>
+	 * The registered tables then can be referenced in Sql queries.
+	 *
+	 * @param path           The path under which the table will be registered
+	 * @param catalogTable   The table to register
+	 * @param ignoreIfExists If true, do nothing if there is already same table name under
+	 *                       the {@code path}. If false, a TableAlreadyExistException throws.
+	 */
+	private void registerTableInternal(String[] path,
+			CatalogBaseTable catalogTable,
+			boolean ignoreIfExists) {
+		String[] fullName = catalogManager.getFullTablePath(Arrays.asList(path));
+		Catalog catalog = getCatalog(fullName[0]).orElseThrow(() ->
+			new TableException("Catalog " + fullName[0] + " does not exist"));
+		ObjectPath objectPath = new ObjectPath(fullName[1], fullName[2]);
 		try {
-			checkValidTableName(name);
-			ObjectPath path = new ObjectPath(defaultDatabaseName, name);
-			Optional<Catalog> catalog = catalogManager.getCatalog(defaultCatalogName);
-			if (catalog.isPresent()) {
-				catalog.get().createTable(
-					path,
-					table,
-					false);
-			}
+			catalog.createTable(objectPath, catalogTable, ignoreIfExists);
 		} catch (Exception e) {
 			throw new TableException("Could not register table", e);
 		}
@@ -477,7 +527,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 					"Table '%s' already exists. Please choose a different name.", name));
 			}
 		} else {
-			registerTableInternal(name, ConnectorCatalogTable.source(tableSource, false));
+			registerTableInternal(new String[] { name },
+				ConnectorCatalogTable.source(tableSource, false), false);
 		}
 	}
 
@@ -502,7 +553,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 					"Table '%s' already exists. Please choose a different name.", name));
 			}
 		} else {
-			registerTableInternal(name, ConnectorCatalogTable.sink(tableSink, false));
+			registerTableInternal(new String[] { name },
+				ConnectorCatalogTable.sink(tableSink, false), false);
 		}
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index c965a98..0f83176 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.CatalogNotExistException;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.factories.TableFactoryUtil;
@@ -387,4 +388,45 @@ public class CatalogManager {
 			return tableSink.getTableSchema();
 		}
 	}
+
+	/**
+	 * Returns the full name of the given table path, this name may be padded
+	 * with current catalog/database name based on the {@code paths} length.
+	 *
+	 * @param paths Table paths whose format can be "catalog.db.table", "db.table" or "table"
+	 * @return An array of complete table path
+	 */
+	public String[] getFullTablePath(List<String> paths) {
+		if (paths == null) {
+			throw new ValidationException("Table paths can not be null!");
+		}
+		if (paths.size() < 1 || paths.size() > 3) {
+			throw new ValidationException("Table paths length must be " +
+				"between 1(inclusive) and 3(inclusive)");
+		}
+		if (paths.stream().anyMatch(StringUtils::isNullOrWhitespaceOnly)) {
+			throw new ValidationException("Table paths contain null or " +
+				"while-space-only string");
+		}
+
+		if (paths.size() == 3) {
+			return new String[] {paths.get(0), paths.get(1), paths.get(2)};
+		}
+
+		String catalogName;
+		String dbName;
+		String tableName;
+
+		if (paths.size() == 1) {
+			catalogName = getCurrentCatalog();
+			dbName = getCurrentDatabase();
+			tableName = paths.get(0);
+		} else {
+			catalogName = getCurrentCatalog();
+			dbName = paths.get(0);
+			tableName = paths.get(1);
+		}
+
+		return new String[]{ catalogName, dbName, tableName };
+	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index df15a95..5566fc3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -52,7 +52,10 @@ public class CatalogTableImpl extends AbstractCatalogTable {
 	@Override
 	public CatalogBaseTable copy() {
 		return new CatalogTableImpl(
-			getSchema().copy(), new ArrayList<>(getPartitionKeys()), new HashMap<>(getProperties()), getComment());
+			getSchema().copy(),
+			new ArrayList<>(getPartitionKeys()),
+			new HashMap<>(getProperties()),
+			getComment());
 	}
 
 	@Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateOperation.java
new file mode 100644
index 0000000..fa8d650
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateOperation.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.operations.Operation;
+
+/**
+ * A {@link Operation} that describes the DDL statements, e.g. CREATE TABLE or CREATE FUNCTION.
+ *
+ * <p>Different sub operations can have their special instances. For example, a
+ * create table operation will have a {@link org.apache.flink.table.catalog.CatalogTable} instance,
+ * while a create function operation will have a
+ * {@link org.apache.flink.table.catalog.CatalogFunction} instance.
+ */
+@Internal
+public interface CreateOperation extends Operation {
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java
new file mode 100644
index 0000000..3c806f6
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Operation to describe a CREATE TABLE statement.
+ */
+public class CreateTableOperation implements CreateOperation {
+	private final String[] tablePath;
+	private CatalogTable catalogTable;
+	private boolean ignoreIfExists;
+
+	public CreateTableOperation(String[] tablePath,
+			CatalogTable catalogTable,
+			boolean ignoreIfExists) {
+		this.tablePath = tablePath;
+		this.catalogTable = catalogTable;
+		this.ignoreIfExists = ignoreIfExists;
+	}
+
+	public CatalogTable getCatalogTable() {
+		return catalogTable;
+	}
+
+	public String[] getTablePath() {
+		return tablePath;
+	}
+
+	public boolean isIgnoreIfExists() {
+		return ignoreIfExists;
+	}
+
+	@Override
+	public String asSummaryString() {
+		Map<String, Object> params = new LinkedHashMap<>();
+		params.put("catalogTable", catalogTable.toProperties());
+		params.put("tablePath", tablePath);
+		params.put("ignoreIfExists", ignoreIfExists);
+
+		return OperationUtils.formatWithChildren(
+			"CREATE TABLE",
+			params,
+			Collections.emptyList(),
+			Operation::asSummaryString);
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
index 0790b80..5025adf 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
@@ -41,10 +41,12 @@ import java.util.Set;
 @Internal
 public class CatalogCalciteSchema implements Schema {
 
+	private final boolean isBatch;
 	private final String catalogName;
 	private final Catalog catalog;
 
-	public CatalogCalciteSchema(String catalogName, Catalog catalog) {
+	public CatalogCalciteSchema(boolean isBatch, String catalogName, Catalog catalog) {
+		this.isBatch = isBatch;
 		this.catalogName = catalogName;
 		this.catalog = catalog;
 	}
@@ -59,7 +61,7 @@ public class CatalogCalciteSchema implements Schema {
 	public Schema getSubSchema(String schemaName) {
 
 		if (catalog.databaseExists(schemaName)) {
-			return new DatabaseCalciteSchema(schemaName, catalogName, catalog);
+			return new DatabaseCalciteSchema(isBatch, schemaName, catalogName, catalog);
 		} else {
 			return null;
 		}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
index cccd275..ceef249 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
@@ -93,7 +93,7 @@ public class CatalogManagerCalciteSchema implements Schema {
 
 		return externalSchema.orElseGet(() ->
 			catalogManager.getCatalog(name)
-				.map(catalog -> new CatalogCalciteSchema(name, catalog))
+				.map(catalog -> new CatalogCalciteSchema(isBatch, name, catalog))
 				.orElse(null)
 		);
 	}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 95475ef..ac7cdb8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -51,11 +51,17 @@ import static java.lang.String.format;
  * Tables are registered as tables in the schema.
  */
 class DatabaseCalciteSchema implements Schema {
+	private final boolean isBatch;
 	private final String databaseName;
 	private final String catalogName;
 	private final Catalog catalog;
 
-	public DatabaseCalciteSchema(String databaseName, String catalogName, Catalog catalog) {
+	public DatabaseCalciteSchema(
+			boolean isBatch,
+			String databaseName,
+			String catalogName,
+			Catalog catalog) {
+		this.isBatch = isBatch;
 		this.databaseName = databaseName;
 		this.catalogName = catalogName;
 		this.catalog = catalog;
@@ -126,7 +132,7 @@ class DatabaseCalciteSchema implements Schema {
 			// this means the TableSource extends from StreamTableSource, this is needed for the
 			// legacy Planner. Blink Planner should use the information that comes from the TableSource
 			// itself to determine if it is a streaming or batch source.
-			true,
+			!isBatch,
 			FlinkStatistic.UNKNOWN()
 		);
 	}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlConversionException.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlConversionException.java
new file mode 100644
index 0000000..c49a31d
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlConversionException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+/**
+ * Exception thrown during the execution of SQL statements.
+ */
+public class SqlConversionException extends RuntimeException {
+
+	private static final long serialVersionUID = 1L;
+
+	public SqlConversionException(String message) {
+		super(message);
+	}
+
+	public SqlConversionException(String message, Throwable e) {
+		super(message, e);
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatements.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatements.java
deleted file mode 100644
index c3c6816..0000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatements.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sqlexec;
-
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
-
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.util.ReflectUtil;
-import org.apache.calcite.util.ReflectiveVisitor;
-
-/**
- * Mix-in tool class for {@code SqlNode} that allows DDL commands to be
- * executed directly.
- *
- * <p>For every kind of {@link SqlNode}, there needs a method named
- * #execute(type), the 'type' argument should be the subclass
- * type for the supported {@link SqlNode}.
- */
-public class SqlExecutableStatements implements ReflectiveVisitor {
-	private TableEnvironment tableEnv;
-
-	private final ReflectUtil.MethodDispatcher<Void> dispatcher =
-		ReflectUtil.createMethodDispatcher(Void.class,
-			this,
-			"execute",
-			SqlNode.class);
-
-	//~ Constructors -----------------------------------------------------------
-
-	private SqlExecutableStatements(TableEnvironment tableEnvironment) {
-		this.tableEnv = tableEnvironment;
-	}
-
-	/**
-	 * This is the main entrance of executing all kinds of DDL/DML {@code SqlNode}s, different
-	 * SqlNode will have it's implementation in the #execute(type) method whose 'type' argument
-	 * is subclass of {@code SqlNode}.
-	 *
-	 * <p>Caution that the {@link #execute(SqlNode)} should never expect to be invoked.
-	 *
-	 * @param tableEnvironment TableEnvironment to interact with
-	 * @param sqlNode          SqlNode to execute on
-	 */
-	public static void executeSqlNode(TableEnvironment tableEnvironment, SqlNode sqlNode) {
-		SqlExecutableStatements statement = new SqlExecutableStatements(tableEnvironment);
-		statement.dispatcher.invoke(sqlNode);
-	}
-
-	/**
-	 * Execute the {@link SqlCreateTable} node.
-	 */
-	public void execute(SqlCreateTable sqlCreateTable) {
-		// need to implement.
-	}
-
-	/** Fallback method to throw exception. */
-	public void execute(SqlNode node) {
-		throw new TableException("Should not invoke to node type "
-			+ node.getClass().getSimpleName());
-	}
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
new file mode 100644
index 0000000..88a9d94
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+import org.apache.flink.sql.parser.SqlProperty;
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.calcite.FlinkTypeSystem;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.PlannerQueryOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Mix-in tool class for {@code SqlNode} that allows DDL commands to be
+ * converted to {@link Operation}.
+ *
+ * <p>For every kind of {@link SqlNode}, there needs to have a corresponding
+ * #convert(type) method, the 'type' argument should be the subclass
+ * of the supported {@link SqlNode}.
+ *
+ * <p>Every #convert() should return a {@link Operation} which can be used in
+ * {@link org.apache.flink.table.delegation.Planner}.
+ */
+public class SqlToOperationConverter {
+	private FlinkPlannerImpl flinkPlanner;
+
+	//~ Constructors -----------------------------------------------------------
+
+	private SqlToOperationConverter(FlinkPlannerImpl flinkPlanner) {
+		this.flinkPlanner = flinkPlanner;
+	}
+
+	/**
+	 * This is the main entrance for executing all kinds of DDL/DML {@code SqlNode}s, different
+	 * SqlNode will have it's implementation in the #execute(type) method whose 'type' argument
+	 * is subclass of {@code SqlNode}.
+	 *
+	 * @param flinkPlanner     FlinkPlannerImpl to convert sql node to rel node
+	 * @param sqlNode          SqlNode to execute on
+	 */
+	public static Operation convert(FlinkPlannerImpl flinkPlanner, SqlNode sqlNode) {
+		// validate the query
+		final SqlNode validated = flinkPlanner.validate(sqlNode);
+		SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner);
+		if (validated instanceof SqlCreateTable) {
+			return converter.convert((SqlCreateTable) validated);
+		} else {
+			return converter.convert(validated);
+		}
+	}
+
+	/**
+	 * Convert the {@link SqlCreateTable} node.
+	 */
+	private Operation convert(SqlCreateTable sqlCreateTable) {
+		// primary key and unique keys are not supported
+		if ((sqlCreateTable.getPrimaryKeyList() != null
+				&& sqlCreateTable.getPrimaryKeyList().size() > 0)
+			|| (sqlCreateTable.getUniqueKeysList() != null
+				&& sqlCreateTable.getUniqueKeysList().size() > 0)) {
+			throw new SqlConversionException("Primary key and unique key are not supported yet.");
+		}
+
+		// set with properties
+		SqlNodeList propertyList = sqlCreateTable.getPropertyList();
+		Map<String, String> properties = new HashMap<>();
+		if (propertyList != null) {
+			propertyList.getList().forEach(p ->
+				properties.put(((SqlProperty) p).getKeyString().toLowerCase(),
+					((SqlProperty) p).getValueString()));
+		}
+
+		TableSchema tableSchema = createTableSchema(sqlCreateTable,
+			new FlinkTypeFactory(new FlinkTypeSystem())); // need to make type factory singleton ?
+		String tableComment = "";
+		if (sqlCreateTable.getComment() != null) {
+			tableComment = sqlCreateTable.getComment().getNlsString().getValue();
+		}
+		// set partition key
+		List<String> partitionKeys = new ArrayList<>();
+		SqlNodeList partitionKey = sqlCreateTable.getPartitionKeyList();
+		if (partitionKey != null) {
+			partitionKeys = partitionKey
+				.getList()
+				.stream()
+				.map(p -> ((SqlIdentifier) p).getSimple())
+				.collect(Collectors.toList());
+		}
+		CatalogTable catalogTable = new CatalogTableImpl(tableSchema,
+			partitionKeys,
+			properties,
+			tableComment);
+		return new CreateTableOperation(sqlCreateTable.fullTableName(), catalogTable,
+			sqlCreateTable.isIfNotExists());
+	}
+
+	/** Fallback method for sql query. */
+	private Operation convert(SqlNode node) {
+		if (node.getKind().belongsTo(SqlKind.QUERY)) {
+			return toQueryOperation(flinkPlanner, node);
+		} else {
+			throw new TableException("Should not invoke to node type "
+				+ node.getClass().getSimpleName());
+		}
+	}
+
+	//~ Tools ------------------------------------------------------------------
+
+	/**
+	 * Create a table schema from {@link SqlCreateTable}. This schema contains computed column
+	 * fields, say, we have a create table DDL statement:
+	 * <blockquote><pre>
+	 *   create table t(
+	 *     a int,
+	 *     b varchar,
+	 *     c as to_timestamp(b))
+	 *   with (
+	 *     connector = 'csv',
+	 *     k1 = 'v1')
+	 * </pre></blockquote>
+	 *
+	 * <p>The returned table schema contains columns (a:int, b:varchar, c:timestamp).
+	 *
+	 * @param sqlCreateTable sql create table node.
+	 * @param factory        FlinkTypeFactory instance.
+	 * @return TableSchema
+	 */
+	private TableSchema createTableSchema(SqlCreateTable sqlCreateTable,
+			FlinkTypeFactory factory) {
+		// setup table columns
+		SqlNodeList columnList = sqlCreateTable.getColumnList();
+		TableSchema physicalSchema = null;
+		TableSchema.Builder builder = new TableSchema.Builder();
+		// collect the physical table schema first.
+		final List<SqlNode> physicalColumns = columnList.getList().stream()
+			.filter(n -> n instanceof SqlTableColumn).collect(Collectors.toList());
+		for (SqlNode node : physicalColumns) {
+			SqlTableColumn column = (SqlTableColumn) node;
+			final RelDataType relType = column.getType().deriveType(factory,
+				column.getType().getNullable());
+			builder.field(column.getName().getSimple(),
+				TypeConversions.fromLegacyInfoToDataType(FlinkTypeFactory.toTypeInfo(relType)));
+			physicalSchema = builder.build();
+		}
+		assert physicalSchema != null;
+		if (sqlCreateTable.containsComputedColumn()) {
+			throw new SqlConversionException("Computed columns for DDL is not supported yet!");
+		}
+		return physicalSchema;
+	}
+
+	private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
+		// transform to a relational tree
+		RelRoot relational = planner.rel(validated);
+		return new PlannerQueryOperation(relational.rel);
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 24e52d7..36e30cb 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.internal
 
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.sql.parser.ddl.SqlCreateTable
 import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder}
 import org.apache.flink.table.catalog._
@@ -27,11 +28,13 @@ import org.apache.flink.table.expressions._
 import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
 import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction, _}
+import org.apache.flink.table.operations.ddl.CreateTableOperation
 import org.apache.flink.table.operations.utils.OperationTreeBuilder
 import org.apache.flink.table.operations.{CatalogQueryOperation, PlannerQueryOperation, TableSourceQueryOperation, _}
 import org.apache.flink.table.planner.PlanningConfigurationBuilder
 import org.apache.flink.table.sinks.{TableSink, TableSinkUtils}
 import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.sqlexec.SqlToOperationConverter
 import org.apache.flink.table.util.JavaScalaConversionUtil
 import org.apache.flink.util.StringUtils
 
@@ -43,6 +46,7 @@ import org.apache.calcite.tools.FrameworkConfig
 import _root_.java.util.Optional
 
 import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.JavaConversions._
 
 /**
   * The abstract base class for the implementation of batch TableEnvironment.
@@ -188,7 +192,7 @@ abstract class TableEnvImpl(
     }
 
     val tableTable = new QueryOperationCatalogView(table.getQueryOperation)
-    registerTableInternal(name, tableTable)
+    registerTableInternal(Array[String](name), tableTable, ignoreIfExists = false)
   }
 
   override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
@@ -274,7 +278,8 @@ abstract class TableEnvImpl(
 
       // no table is registered
       case _ =>
-        registerTableInternal(name, ConnectorCatalogTable.source(tableSource, isBatch))
+        registerTableInternal(Array[String](name),
+          ConnectorCatalogTable.source(tableSource, isBatch), ignoreIfExists = false)
     }
   }
 
@@ -301,7 +306,8 @@ abstract class TableEnvImpl(
 
       // no table is registered
       case _ =>
-        registerTableInternal(name, ConnectorCatalogTable.sink(tableSink, isBatch))
+        registerTableInternal(Array[String](name),
+          ConnectorCatalogTable.sink(tableSink, isBatch), ignoreIfExists = false)
     }
   }
 
@@ -311,17 +317,33 @@ abstract class TableEnvImpl(
     }
   }
 
-  protected def registerTableInternal(name: String, table: CatalogBaseTable): Unit = {
-    checkValidTableName(name)
-    val path = new ObjectPath(defaultDatabaseName, name)
-    JavaScalaConversionUtil.toScala(catalogManager.getCatalog(defaultCatalogName)) match {
-      case Some(catalog) =>
-        catalog.createTable(
-          path,
-          table,
-          false)
-      case None => throw new TableException("The default catalog does not exist.")
-    }
+  /**
+    * Registers a [[CatalogTable]] under a given object path. The `path` could be
+    * 3 formats:
+    * <ol>
+    * <li>`catalog.db.table`: A full table path including the catalog name,
+    * the database name and table name.</li>
+    * <li>`db.table`: database name following table name, with the current catalog name.</li>
+    * <li>`table`: Only the table name, with the current catalog name and database  name.</li>
+    * </ol>
+    * The registered tables then can be referenced in Sql queries.
+    *
+    * @param path           The path under which the table will be registered
+    * @param catalogTable   The table to register
+    * @param ignoreIfExists If true, do nothing if there is already same table name under
+    *                       the { @code path}. If false, a TableAlreadyExistException throws.
+    */
+  private def registerTableInternal(
+      path: Array[String],
+      catalogTable: CatalogBaseTable,
+      ignoreIfExists: Boolean): Unit = {
+    val fullName = catalogManager.getFullTablePath(path.toList)
+    val catalog = getCatalog(fullName(0)).orElseThrow(
+      new _root_.java.util.function.Supplier[TableException] {
+        override def get = new TableException(s"Catalog ${fullName(0)} does not exist")
+      })
+    val objectPath = new ObjectPath(fullName(1), fullName(2))
+    catalog.createTable(objectPath, catalogTable, ignoreIfExists)
   }
 
   protected def replaceTableInternal(name: String, table: CatalogBaseTable): Unit = {
@@ -380,6 +402,45 @@ abstract class TableEnvImpl(
     planner.getCompletionHints(statement, position)
   }
 
+  override def sql(statement: String): Table = {
+    val planner = getFlinkPlanner
+    // parse the sql query
+    val parsed = planner.parse(statement)
+    if (null != parsed) {
+      parsed match {
+        case insert: SqlInsert =>
+          val query = insert.getSource
+          val tableOperation = SqlToOperationConverter
+            .convert(planner, query)
+            .asInstanceOf[QueryOperation]
+          // get query result as Table
+          val queryResult = createTable(tableOperation)
+
+          // get name of sink table
+          val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
+
+          // insert query result into sink table
+          insertInto(queryResult, targetTablePath.asScala:_*)
+          // returns null for SQL INSERT statement now
+          null
+        case createTable: SqlCreateTable =>
+          val operation = SqlToOperationConverter
+            .convert(planner, createTable)
+            .asInstanceOf[CreateTableOperation]
+          registerTableInternal(operation.getTablePath,
+            operation.getCatalogTable,
+            operation.isIgnoreIfExists)
+          // returns null for DDL statement now
+          null
+        case query: SqlNode if query.getKind.belongsTo(SqlKind.QUERY) =>
+          createTable(SqlToOperationConverter.convert(planner, query)
+            .asInstanceOf[PlannerQueryOperation])
+      }
+    } else {
+      throw new TableException("Unsupported SQL query!")
+    }
+  }
+
   override def sqlQuery(query: String): Table = {
     val planner = getFlinkPlanner
     // parse the sql query
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index 2814497..e92bef4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -18,27 +18,31 @@
 
 package org.apache.flink.table.calcite
 
-import java.util
+import org.apache.flink.sql.parser.ExtendedSqlNode
+import org.apache.flink.table.api.{SqlParserException, TableException, ValidationException}
+import org.apache.flink.table.catalog.CatalogReader
+
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.RelOptTable.ViewExpander
 import org.apache.calcite.plan._
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.RelRoot
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.RelFactories
 import org.apache.calcite.rex.RexBuilder
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
 import org.apache.calcite.sql.parser.{SqlParser, SqlParseException => CSqlParseException}
 import org.apache.calcite.sql.validate.SqlValidator
-import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter}
-import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
-import org.apache.flink.table.api.{SqlParserException, TableException, ValidationException}
-import org.apache.flink.table.catalog.CatalogReader
+import org.apache.calcite.tools.{FrameworkConfig, RelBuilder, RelConversionException}
+
+import _root_.java.lang.{Boolean => JBoolean}
+import _root_.java.util
+import _root_.java.util.function.{Function => JFunction}
 
 import scala.collection.JavaConversions._
-import java.util.function.{Function => JFunction}
-import java.lang.{Boolean => JBoolean}
 
 /**
   * NOTE: this is heavily inspired by Calcite's PlannerImpl.
@@ -98,13 +102,24 @@ class FlinkPlannerImpl(
 
   def validate(sqlNode: SqlNode): SqlNode = {
     val catalogReader = catalogReaderSupplier.apply(false)
+    // do pre-validate rewrite.
+    sqlNode.accept(new PreValidateReWriter(catalogReader, typeFactory))
+    // do extended validation.
+    sqlNode match {
+      case node: ExtendedSqlNode =>
+        node.validate()
+      case _ =>
+    }
+    // no need to validate row type for DDL nodes.
+    if (sqlNode.getKind.belongsTo(SqlKind.DDL)) {
+      return sqlNode
+    }
     validator = new FlinkCalciteSqlValidator(
       operatorTable,
       catalogReader,
       typeFactory)
     validator.setIdentifierExpansion(true)
     try {
-      sqlNode.accept(new PreValidateReWriter(catalogReader, typeFactory))
       validator.validate(sqlNode)
     }
     catch {
@@ -118,10 +133,11 @@ class FlinkPlannerImpl(
       assert(validatedSqlNode != null)
       val rexBuilder: RexBuilder = createRexBuilder
       val cluster: RelOptCluster = FlinkRelOptClusterFactory.create(planner, rexBuilder)
+      val catalogReader: CatalogReader = catalogReaderSupplier.apply(false)
       val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
         new ViewExpanderImpl,
         validator,
-        catalogReaderSupplier.apply(false),
+        catalogReader,
         cluster,
         convertletTable,
         sqlToRelConverterConfig)
@@ -176,7 +192,8 @@ class FlinkPlannerImpl(
         sqlToRelConverterConfig)
       root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
       root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
-      root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+      val relBuilder = createRelBuilder(root.rel.getCluster, catalogReader)
+      root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel, relBuilder))
       FlinkPlannerImpl.this.root
     }
   }
@@ -185,6 +202,11 @@ class FlinkPlannerImpl(
     new RexBuilder(typeFactory)
   }
 
+  private def createRelBuilder(
+      relOptCluster: RelOptCluster,
+      relOptSchema: RelOptSchema): RelBuilder = {
+    RelFactories.LOGICAL_BUILDER.create(relOptCluster, relOptSchema)
+  }
 }
 
 object FlinkPlannerImpl {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index fd5a5c7..502ea36 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -23,16 +23,6 @@ import org.apache.flink.api.dag.Transformation
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
-import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind, SqlNode}
-
-import _root_.java.lang.{Boolean => JBool}
-import _root_.java.util.{Objects, List => JList}
-import java.util
 import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.{CalciteConfig, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
 import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, _}
@@ -48,9 +38,20 @@ import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
 import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sinks._
+import org.apache.flink.table.sqlexec.SqlToOperationConverter
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.util.JavaScalaConversionUtil
 
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind}
+
+import _root_.java.lang.{Boolean => JBool}
+import _root_.java.util
+import _root_.java.util.{Objects, List => JList}
+
 import _root_.scala.collection.JavaConverters._
 
 /**
@@ -104,18 +105,21 @@ class StreamPlanner(
         if (targetColumnList != null && insert.getTargetColumnList.size() != 0) {
           throw new ValidationException("Partial inserts are not supported")
         }
-
         // get name of sink table
         val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
 
-        List(new CatalogSinkModifyOperation(targetTablePath, toRel(planner, insert.getSource))
+        List(new CatalogSinkModifyOperation(targetTablePath,
+          SqlToOperationConverter.convert(planner,
+            insert.getSource).asInstanceOf[PlannerQueryOperation])
           .asInstanceOf[Operation]).asJava
-      case node if node.getKind.belongsTo(SqlKind.QUERY) =>
-        List(toRel(planner, parsed).asInstanceOf[Operation]).asJava
+      case node if node.getKind.belongsTo(SqlKind.QUERY) || node.getKind.belongsTo(SqlKind.DDL) =>
+        List(SqlToOperationConverter.convert(planner, parsed)).asJava
       case _ =>
         throw new TableException(
           "Unsupported SQL query! parse() only accepts SQL queries of type " +
-            "SELECT, UNION, INTERSECT, EXCEPT, VALUES, ORDER_BY or INSERT.")
+            "SELECT, UNION, INTERSECT, EXCEPT, VALUES, ORDER_BY or INSERT;" +
+            "and SQL DDLs of type " +
+            "CREATE TABLE")
     }
   }
 
@@ -141,17 +145,6 @@ class StreamPlanner(
     planner.getCompletionHints(statement, position)
   }
 
-  private def toRel(
-      planner: FlinkPlannerImpl,
-      parsed: SqlNode)
-    : PlannerQueryOperation = {
-    // validate the sql query
-    val validated = planner.validate(parsed)
-    // transform to a relational tree
-    val relational = planner.rel(validated)
-    new PlannerQueryOperation(relational.rel)
-  }
-
   private def translate(tableOperation: ModifyOperation)
     : Transformation[_] = {
     tableOperation match {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
index 20d78e3..312b5de 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
@@ -48,7 +48,7 @@ public class DatabaseCalciteSchemaTest {
 	@Test
 	public void testCatalogTable() throws TableAlreadyExistException, DatabaseNotExistException {
 		GenericInMemoryCatalog catalog = new GenericInMemoryCatalog(catalogName, databaseName);
-		DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(
+		DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(false,
 			databaseName,
 			catalogName,
 			catalog);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
new file mode 100644
index 0000000..cc78be7
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.BatchTableEnvImpl;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.calcite.sql.SqlNode;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Test cases for SqlExecutableStatement. **/
+public class SqlToOperationConverterTest {
+	private static final ExecutionEnvironment streamExec =
+		ExecutionEnvironment.getExecutionEnvironment();
+	private static final BatchTableEnvImpl batchEnv =
+		(BatchTableEnvImpl) BatchTableEnvironment.create(streamExec);
+
+	private static final FlinkPlannerImpl planner = batchEnv.getFlinkPlanner();
+
+	@Test
+	public void testCreateTable() {
+		final String sql = "CREATE TABLE tbl1 (\n" +
+			"  a bigint,\n" +
+			"  b varchar, \n" +
+			"  c int, \n" +
+			"  d varchar" +
+			")\n" +
+			"  PARTITIONED BY (a, d)\n" +
+			"  with (\n" +
+			"    connector = 'kafka', \n" +
+			"    kafka.topic = 'log.test'\n" +
+			")\n";
+		SqlNode node = planner.parse(sql);
+		assert node instanceof SqlCreateTable;
+		Operation operation = SqlToOperationConverter.convert(planner, node);
+		assert operation instanceof CreateTableOperation;
+		CreateTableOperation op = (CreateTableOperation) operation;
+		CatalogTable catalogTable = op.getCatalogTable();
+		assertEquals(Arrays.asList("a", "d"), catalogTable.getPartitionKeys());
+		assertArrayEquals(catalogTable.getSchema().getFieldNames(),
+			new String[] {"a", "b", "c", "d"});
+		assertArrayEquals(catalogTable.getSchema().getFieldDataTypes(),
+			new DataType[]{
+				DataTypes.BIGINT(),
+				DataTypes.VARCHAR(Integer.MAX_VALUE),
+				DataTypes.INT(),
+				DataTypes.VARCHAR(Integer.MAX_VALUE)});
+	}
+
+	@Test(expected = SqlConversionException.class)
+	public void testCreateTableWithPkUniqueKeys() {
+		final String sql = "CREATE TABLE tbl1 (\n" +
+			"  a bigint,\n" +
+			"  b varchar, \n" +
+			"  c int, \n" +
+			"  d varchar, \n" +
+			"  primary key(a), \n" +
+			"  unique(a, b) \n" +
+			")\n" +
+			"  PARTITIONED BY (a, d)\n" +
+			"  with (\n" +
+			"    connector = 'kafka', \n" +
+			"    kafka.topic = 'log.test'\n" +
+			")\n";
+		SqlNode node = planner.parse(sql);
+		assert node instanceof SqlCreateTable;
+		SqlToOperationConverter.convert(planner, node);
+	}
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index c5fe13f..e28eb5d 100644
--- a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -22,3 +22,4 @@ org.apache.flink.table.factories.utils.TestAmbiguousTableFormatFactory
 org.apache.flink.table.factories.utils.TestExternalCatalogFactory
 org.apache.flink.table.catalog.TestExternalTableSourceFactory
 org.apache.flink.table.factories.utils.TestCatalogFactory
+org.apache.flink.table.factories.utils.TestCollectionTableFactory
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
new file mode 100644
index 0000000..fc97071
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
+import org.apache.flink.table.factories.utils.TestCollectionTableFactory
+import org.apache.flink.types.Row
+
+import org.junit.Assert.assertEquals
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, Ignore, Test}
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/** Test cases for catalog table. */
+@RunWith(classOf[Parameterized])
+class CatalogTableITCase(isStreaming: Boolean) {
+
+  private val batchExec: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+  private var batchEnv: BatchTableEnvironment = _
+  private val streamExec: StreamExecutionEnvironment = StreamExecutionEnvironment
+    .getExecutionEnvironment
+  private var streamEnv: StreamTableEnvironment = _
+
+  private val SOURCE_DATA = List(
+      toRow(1, "a"),
+      toRow(2, "b"),
+      toRow(3, "c")
+  )
+
+  private val DIM_DATA = List(
+    toRow(1, "aDim"),
+    toRow(2, "bDim"),
+    toRow(3, "cDim")
+  )
+
+  implicit def rowOrdering: Ordering[Row] = Ordering.by((r : Row) => {
+    val builder = new StringBuilder
+    0 until r.getArity foreach(idx => builder.append(r.getField(idx)))
+    builder.toString()
+  })
+
+  @Before
+  def before(): Unit = {
+    batchExec.setParallelism(4)
+    streamExec.setParallelism(4)
+    batchEnv = BatchTableEnvironment.create(batchExec)
+    streamEnv = StreamTableEnvironment.create(streamExec)
+    TestCollectionTableFactory.reset()
+    TestCollectionTableFactory.isStreaming = isStreaming
+  }
+
+  def toRow(args: Any*):Row = {
+    val row = new Row(args.length)
+    0 until args.length foreach {
+      i => row.setField(i, args(i))
+    }
+    row
+  }
+
+  def tableEnv: TableEnvironment = {
+    if (isStreaming) {
+      streamEnv
+    } else {
+      batchEnv
+    }
+  }
+
+  def execJob(name: String) = {
+    if (isStreaming) {
+      streamExec.execute(name)
+    } else {
+      batchExec.execute(name)
+    }
+  }
+
+  @Test
+  def testInsertInto(): Unit = {
+    val sourceData = List(
+      toRow(1, "1000", 2),
+      toRow(2, "1", 3),
+      toRow(3, "2000", 4),
+      toRow(1, "2", 2),
+      toRow(2, "3000", 3)
+    )
+    TestCollectionTableFactory.initData(sourceData)
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a int,
+        |  b varchar,
+        |  c int
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a int,
+        |  b varchar,
+        |  c int
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2
+        |select t1.a, t1.b, (t1.a + 1) as c from t1
+      """.stripMargin
+    tableEnv.sql(sourceDDL)
+    tableEnv.sql(sinkDDL)
+    tableEnv.sql(query)
+    execJob("testJob")
+    assertEquals(sourceData.sorted, TestCollectionTableFactory.RESULT.sorted)
+  }
+
+  @Ignore // need to implement
+  @Test
+  def testInsertTargetTableWithComputedColumn(): Unit = {
+    TestCollectionTableFactory.initData(SOURCE_DATA)
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a int,
+        |  b varchar,
+        |  c int
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a int,
+        |  b varchar,
+        |  c as a + 1
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2(a, b)
+        |select t1.a, t1.b from t1
+      """.stripMargin
+    tableEnv.sql(sourceDDL)
+    tableEnv.sql(sinkDDL)
+    tableEnv.sql(query)
+    execJob("testJob")
+    assertEquals(SOURCE_DATA.sorted, TestCollectionTableFactory.RESULT.sorted)
+  }
+
+  @Test
+  def testInsertWithJoinedSource(): Unit = {
+    val sourceData = List(
+      toRow(1, 1000, 2),
+      toRow(2, 1, 3),
+      toRow(3, 2000, 4),
+      toRow(1, 2, 2),
+      toRow(2, 3000, 3)
+    )
+
+    val expected = List(
+      toRow(1, 1000, 2, 1),
+      toRow(1, 2, 2, 1),
+      toRow(2, 1, 1, 2),
+      toRow(2, 3000, 1, 2)
+    )
+    TestCollectionTableFactory.initData(sourceData)
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a int,
+        |  b int,
+        |  c int
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a int,
+        |  b int,
+        |  c int,
+        |  d int
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2
+        |select a.a, a.b, b.a, b.b
+        |  from t1 a
+        |  join t1 b
+        |  on a.a = b.b
+      """.stripMargin
+    tableEnv.sql(sourceDDL)
+    tableEnv.sql(sinkDDL)
+    tableEnv.sql(query)
+    execJob("testJob")
+    assertEquals(expected.sorted, TestCollectionTableFactory.RESULT.sorted)
+  }
+
+  @Test
+  def testInsertWithAggregateSource(): Unit = {
+    if (isStreaming) {
+      return
+    }
+    val sourceData = List(
+      toRow(1, 1000, 2),
+      toRow(2, 1000, 3),
+      toRow(3, 2000, 4),
+      toRow(4, 2000, 5),
+      toRow(5, 3000, 6)
+    )
+
+    val expected = List(
+      toRow(3, 1000),
+      toRow(5, 3000),
+      toRow(7, 2000)
+    )
+    TestCollectionTableFactory.initData(sourceData)
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a int,
+        |  b int,
+        |  c int
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a int,
+        |  b int
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2
+        |select sum(a), t1.b from t1 group by t1.b
+      """.stripMargin
+    tableEnv.sql(sourceDDL)
+    tableEnv.sql(sinkDDL)
+    tableEnv.sql(query)
+    execJob("testJob")
+    assertEquals(expected.sorted, TestCollectionTableFactory.RESULT.sorted)
+  }
+
+  @Test @Ignore // need to implement
+  def testStreamSourceTableWithProctime(): Unit = {
+    val sourceData = List(
+      toRow(1, 1000),
+      toRow(2, 2000),
+      toRow(3, 3000)
+    )
+    TestCollectionTableFactory.initData(sourceData, emitInterval = 1000L)
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a int,
+        |  b int,
+        |  c as proctime,
+        |  primary key(a)
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a int,
+        |  b int
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2
+        |select sum(a), sum(b) from t1 group by TUMBLE(c, INTERVAL '1' SECOND)
+      """.stripMargin
+
+    tableEnv.sql(sourceDDL)
+    tableEnv.sql(sinkDDL)
+    tableEnv.sql(query)
+    execJob("testJob")
+    assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
+  }
+
+  @Test @Ignore // need to implement
+  def testStreamSourceTableWithRowtime(): Unit = {
+    val sourceData = List(
+      toRow(1, 1000),
+      toRow(2, 2000),
+      toRow(3, 3000)
+    )
+    TestCollectionTableFactory.initData(sourceData, emitInterval = 1000L)
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  primary key(a),
+        |  WATERMARK wm FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a bigint,
+        |  b bigint
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2
+        |select sum(a), sum(b) from t1 group by TUMBLE(wm, INTERVAL '1' SECOND)
+      """.stripMargin
+
+    tableEnv.sql(sourceDDL)
+    tableEnv.sql(sinkDDL)
+    tableEnv.sql(query)
+    execJob("testJob")
+    assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
+  }
+
+  @Test @Ignore // need to implement
+  def testBatchSourceTableWithProctime(): Unit = {
+    val sourceData = List(
+      toRow(1, 1000),
+      toRow(2, 2000),
+      toRow(3, 3000)
+    )
+    TestCollectionTableFactory.initData(sourceData, emitInterval = 1000L)
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a int,
+        |  b int,
+        |  c as proctime,
+        |  primary key(a)
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a int,
+        |  b int
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2
+        |select sum(a), sum(b) from t1 group by TUMBLE(c, INTERVAL '1' SECOND)
+      """.stripMargin
+
+    tableEnv.sql(sourceDDL)
+    tableEnv.sql(sinkDDL)
+    tableEnv.sql(query)
+    execJob("testJob")
+    assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
+  }
+
+  @Test @Ignore // need to implement
+  def testBatchTableWithRowtime(): Unit = {
+    val sourceData = List(
+      toRow(1, 1000),
+      toRow(2, 2000),
+      toRow(3, 3000)
+    )
+    TestCollectionTableFactory.initData(sourceData, emitInterval = 1000L)
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  primary key(a),
+        |  WATERMARK wm FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a bigint,
+        |  b bigint
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2
+        |select sum(a), sum(b) from t1 group by TUMBLE(wm, INTERVAL '1' SECOND)
+      """.stripMargin
+
+    tableEnv.sql(sourceDDL)
+    tableEnv.sql(sinkDDL)
+    tableEnv.sql(query)
+    execJob("testJob")
+    assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
+  }
+}
+
+object CatalogTableITCase {
+  @Parameterized.Parameters(name = "{0}")
+  def parameters(): java.util.Collection[Boolean] = {
+    util.Arrays.asList(true, false)
+  }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala
new file mode 100644
index 0000000..3b17986
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource}
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
+import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
+import org.apache.flink.table.factories.utils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, StreamTableSinkFactory, StreamTableSourceFactory}
+import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
+import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, StreamTableSink, TableSink}
+import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource, TableSource}
+import org.apache.flink.types.Row
+
+import java.io.IOException
+import java.util
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+class TestCollectionTableFactory
+  extends StreamTableSourceFactory[Row]
+  with StreamTableSinkFactory[Row]
+  with BatchTableSourceFactory[Row]
+  with BatchTableSinkFactory[Row]
+{
+
+  override def createTableSource(properties: JMap[String, String]): TableSource[Row] = {
+    getCollectionSource(properties, isStreaming = TestCollectionTableFactory.isStreaming)
+  }
+
+  override def createTableSink(properties: JMap[String, String]): TableSink[Row] = {
+    getCollectionSink(properties)
+  }
+
+  override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[Row] = {
+    getCollectionSource(properties, isStreaming = true)
+  }
+
+  override def createStreamTableSink(properties: JMap[String, String]): StreamTableSink[Row] = {
+    getCollectionSink(properties)
+  }
+
+  override def createBatchTableSource(properties: JMap[String, String]): BatchTableSource[Row] = {
+    getCollectionSource(properties, isStreaming = false)
+  }
+
+  override def createBatchTableSink(properties: JMap[String, String]): BatchTableSink[Row] = {
+    getCollectionSink(properties)
+  }
+
+  override def requiredContext(): JMap[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR, "COLLECTION")
+    context
+  }
+
+  override def supportedProperties(): JList[String] = {
+    val supported = new JArrayList[String]()
+    supported.add("*")
+    supported
+  }
+}
+
+object TestCollectionTableFactory {
+  var isStreaming: Boolean = true
+
+  val SOURCE_DATA = new JLinkedList[Row]()
+  val DIM_DATA = new JLinkedList[Row]()
+  val RESULT = new JLinkedList[Row]()
+  private var emitIntervalMS = -1L
+
+  def initData(sourceData: JList[Row],
+      dimData: JList[Row] = List(),
+      emitInterval: Long = -1L): Unit ={
+    SOURCE_DATA.addAll(sourceData)
+    DIM_DATA.addAll(dimData)
+    emitIntervalMS = emitInterval
+  }
+
+  def reset(): Unit ={
+    RESULT.clear()
+    SOURCE_DATA.clear()
+    DIM_DATA.clear()
+    emitIntervalMS = -1L
+  }
+
+  def getCollectionSource(props: JMap[String, String],
+      isStreaming: Boolean): CollectionTableSource = {
+    val properties = new DescriptorProperties()
+    properties.putProperties(props)
+    val schema = properties.getTableSchema(Schema.SCHEMA)
+    new CollectionTableSource(emitIntervalMS, schema, isStreaming)
+  }
+
+  def getCollectionSink(props: JMap[String, String]): CollectionTableSink = {
+    val properties = new DescriptorProperties()
+    properties.putProperties(props)
+    val schema = properties.getTableSchema(Schema.SCHEMA)
+    new CollectionTableSink(schema.toRowType.asInstanceOf[RowTypeInfo])
+  }
+
+  /**
+    * Table source of collection.
+    */
+  class CollectionTableSource(
+      val emitIntervalMs: Long,
+      val schema: TableSchema,
+      val isStreaming: Boolean)
+    extends BatchTableSource[Row]
+    with StreamTableSource[Row]
+    with LookupableTableSource[Row] {
+
+    private val rowType: TypeInformation[Row] = schema.toRowType
+
+    override def isBounded: Boolean = !isStreaming
+
+    def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+      execEnv.createInput(new TestCollectionInputFormat[Row](emitIntervalMs,
+        SOURCE_DATA,
+        rowType.createSerializer(new ExecutionConfig)),
+        rowType)
+    }
+
+    override def getDataStream(streamEnv: StreamExecutionEnvironment): DataStreamSource[Row] = {
+      streamEnv.createInput(new TestCollectionInputFormat[Row](emitIntervalMs,
+        SOURCE_DATA,
+        rowType.createSerializer(new ExecutionConfig)),
+        rowType)
+    }
+
+    override def getReturnType: TypeInformation[Row] = rowType
+
+    override def getTableSchema: TableSchema = {
+      schema
+    }
+
+    override def getLookupFunction(lookupKeys: Array[String]): TemporalTableFetcher = {
+      new TemporalTableFetcher(DIM_DATA, lookupKeys.map(schema.getFieldNames.indexOf(_)))
+    }
+
+    override def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[Row] = null
+
+    override def isAsyncEnabled: Boolean = false
+  }
+
+  /**
+    * Table sink of collection.
+    */
+  class CollectionTableSink(val outputType: RowTypeInfo)
+      extends BatchTableSink[Row]
+      with AppendStreamTableSink[Row] {
+    override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+      dataSet.output(new LocalCollectionOutputFormat[Row](RESULT)).setParallelism(1)
+    }
+
+    override def getOutputType: RowTypeInfo = outputType
+
+    override def getFieldNames: Array[String] = outputType.getFieldNames
+
+    override def getFieldTypes: Array[TypeInformation[_]] = {
+      outputType.getFieldTypes
+    }
+
+    override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+      dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1)
+    }
+
+    override def configure(fieldNames: Array[String],
+        fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+  }
+
+  /**
+    * Sink function of unsafe memory.
+    */
+  class UnsafeMemorySinkFunction(outputType: TypeInformation[Row]) extends RichSinkFunction[Row] {
+    private var serializer: TypeSerializer[Row] = _
+
+    override def open(param: Configuration): Unit = {
+      serializer = outputType.createSerializer(new ExecutionConfig)
+    }
+
+    @throws[Exception]
+    override def invoke(row: Row): Unit = {
+      RESULT.add(serializer.copy(row))
+    }
+  }
+
+  /**
+    * Collection inputFormat for testing.
+    */
+  class TestCollectionInputFormat[T](
+      val emitIntervalMs: Long,
+      val dataSet: java.util.Collection[T],
+      val serializer: TypeSerializer[T])
+    extends CollectionInputFormat[T](dataSet, serializer) {
+    @throws[IOException]
+    override def reachedEnd: Boolean = {
+      if (emitIntervalMs > 0) {
+        try
+          Thread.sleep(emitIntervalMs)
+        catch {
+          case _: InterruptedException =>
+        }
+      }
+      super.reachedEnd
+    }
+  }
+
+  /**
+    * Dimension table source fetcher.
+    */
+  class TemporalTableFetcher(
+      val dimData: JLinkedList[Row],
+      val keys: Array[Int]) extends TableFunction[Row] {
+
+    @throws[Exception]
+    def eval(values: Any*): Unit = {
+      for (data <- dimData) {
+        var matched = true
+        var idx = 0
+        while (matched && idx < keys.length) {
+          val dimField = data.getField(keys(idx))
+          val inputField = values(idx)
+          matched = dimField.equals(inputField)
+          idx += 1
+        }
+        if (matched) {
+          // copy the row data
+          val ret = new Row(data.getArity)
+          0 until data.getArity foreach { idx =>
+            ret.setField(idx, data.getField(idx))
+          }
+          collect(ret)
+        }
+      }
+    }
+  }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index cb4567f..af72842 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -96,4 +96,6 @@ class MockTableEnvironment extends TableEnvironment {
     sinkPathContinued: String*): Unit = ???
 
   override def execute(jobName: String): JobExecutionResult = ???
+
+  override def sql(statement: String): Table = ???
 }