You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 13:32:12 UTC

[flink] branch master updated: [FLINK-13209][table-api] Revert TableEnvironment#sql and move ddl support to sqlUpdate

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b0bf838  [FLINK-13209][table-api] Revert TableEnvironment#sql and move ddl support to sqlUpdate
b0bf838 is described below

commit b0bf8383330d9c5d8f824c438a4a4cf1044f25ba
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Thu Jul 11 14:43:33 2019 +0800

    [FLINK-13209][table-api] Revert TableEnvironment#sql and move ddl support to sqlUpdate
    
    This closes #9081
---
 flink-python/pyflink/table/table_environment.py    |  82 +++++++++-----
 .../table/tests/test_environment_completeness.py   |   3 +-
 .../table/tests/test_table_environment_api.py      |   4 +-
 .../apache/flink/table/api/TableEnvironment.java   | 119 +++++++++------------
 .../table/api/internal/TableEnvironmentImpl.java   |  44 ++------
 .../flink/table/api/internal/TableEnvImpl.scala    |  46 ++------
 .../flink/table/catalog/CatalogTableITCase.scala   |  48 ++++-----
 .../flink/table/utils/MockTableEnvironment.scala   |   2 -
 8 files changed, 145 insertions(+), 203 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index f3f6497..5def53a 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -234,14 +234,49 @@ class TableEnvironment(object):
         """
         return self._j_tenv.explain(table._j_table)
 
-    def sql(self, query):
+    def sql_query(self, query):
         """
-        Evaluates single sql statement including DDLs and DMLs.
+        Evaluates a SQL query on registered tables and retrieves the result as a :class:`Table`.
 
-        Note: Always use this interface to execute a sql query. It only supports
-        to execute one sql statement a time.
+        All tables referenced by the query must be registered in the TableEnvironment.
 
-        A DDL statement can execute to create/drop a table/view:
+        A :class:`Table` is automatically registered when its :func:`~Table.__str__` method is
+        called, for example when it is embedded into a String.
+
+        Hence, SQL queries can directly reference a :class:`Table` as follows:
+        ::
+
+            >>> table = ...
+            # the table is not registered to the table environment
+            >>> table_env.sql_query("SELECT * FROM %s" % table)
+
+        :param query: The sql query string.
+        :return: The result :class:`Table`.
+        """
+        j_table = self._j_tenv.sqlQuery(query)
+        return Table(j_table)
+
+    def sql_update(self, stmt):
+        """
+        Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement
+
+        .. note::
+
+            Currently only SQL INSERT statements and CREATE TABLE statements are supported.
+
+        All tables referenced by the query must be registered in the TableEnvironment.
+        A :class:`Table` is automatically registered when its :func:`~Table.__str__` method is
+        called, for example when it is embedded into a String.
+        Hence, SQL queries can directly reference a :class:`Table` as follows:
+        ::
+
+            # register the table sink into which the result is inserted.
+            >>> table_env.register_table_sink("sink_table", table_sink)
+            >>> source_table = ...
+            # source_table is not registered to the table environment
+            >>> table_env.sql_update("INSERT INTO sink_table SELECT * FROM %s" % source_table)
+
+        A DDL statement can also be executed to create/drop a table:
         For example, the below DDL statement would create a CSV table named `tbl1`
         into the current catalog::
 
@@ -250,18 +285,11 @@ class TableEnvironment(object):
                 b bigint,
                 c varchar
             ) with (
-                connector = 'csv',
-                csv.path = 'xxx'
+                connector.type = 'filesystem',
+                format.type = 'csv',
+                connector.path = 'xxx'
             )
 
-        The returns table format for different kind of statement:
-
-        DDL: returns None.
-
-        DML: a sql insert returns None; a sql query(select) returns a table
-        to describe the query data set, it can be further queried through the Table API,
-        or directly write to sink with :func:`Table.insert_into`.
-
         SQL queries can directly execute as follows:
         ::
 
@@ -271,9 +299,11 @@ class TableEnvironment(object):
             ...     a int,
             ...     b varchar
             ... ) with (
-            ...     connector = 'kafka',
-            ...     kafka.topic = 'xxx',
-            ...     kafka.endpoint = 'x.x.x'
+            ...     connector.type = 'kafka',
+            ...     `update-mode` = 'append',
+            ...     connector.topic = 'xxx',
+            ...     connector.properties.0.key = 'k0',
+            ...     connector.properties.0.value = 'v0'
             ... )
             ... '''
 
@@ -283,8 +313,9 @@ class TableEnvironment(object):
             ...     a int,
             ...     b varchar
             ... ) with (
-            ...     connector = 'csv',
-            ...     csv.path = 'xxx'
+            ...     connector.type = 'filesystem',
+            ...     format.type = 'csv',
+            ...     connector.path = 'xxx'
             ... )
             ... '''
 
@@ -294,14 +325,11 @@ class TableEnvironment(object):
             >>> table_env.sql(query)
             >>> table_env.execute("MyJob")
 
-        This code snippet creates a job to read data from Kafka source into a CSV sink.
-
-        :param query: The SQL statement to evaluate.
+        :param stmt: The SQL statement to evaluate.
+        :param query_config: The :class:`QueryConfig` to use.
         """
-        j_table = self._j_tenv.sql(query)
-        if j_table is None:
-            return None
-        return Table(j_table)
+        # type: (str) -> None
+        self._j_tenv.sqlUpdate(stmt)
 
     def get_current_catalog(self):
         """
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index fd95fc3..1f82445 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -41,11 +41,10 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
         # registerExternalCatalog, getRegisteredExternalCatalog, registerCatalog, getCatalog and
         # listTables should be supported when catalog supported in python.
         # getCompletionHints has been deprecated. It will be removed in the next release.
-        # sqlQuery and sqlUpdate has been deprecated. It will be removed in the next release.
         # TODO add TableEnvironment#create method with EnvironmentSettings as a parameter
         return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'registerCatalog',
                 'getCatalog', 'registerFunction', 'listUserDefinedFunctions', 'listTables',
-                'getCompletionHints', 'create', 'sqlQuery', 'sqlUpdate'}
+                'getCompletionHints', 'create'}
 
 
 if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index f85d6af..3c7e0f7 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -112,7 +112,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
             "sinks",
             source_sink_utils.TestAppendSink(field_names, field_types))
 
-        result = t_env.sql("select a + 1, b, c from %s" % source)
+        result = t_env.sql_query("select a + 1, b, c from %s" % source)
         result.insert_into("sinks")
         self.env.execute()
         actual = source_sink_utils.results()
@@ -129,7 +129,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
             "sinks",
             source_sink_utils.TestAppendSink(field_names, field_types))
 
-        t_env.sql("insert into sinks select * from %s" % source)
+        t_env.sql_update("insert into sinks select * from %s" % source)
         self.env.execute("test_sql_job")
 
         actual = source_sink_utils.results()
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 535a6bc..c396c53 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -30,8 +30,6 @@ import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 
-import javax.annotation.Nullable;
-
 import java.util.Optional;
 
 /**
@@ -323,69 +321,6 @@ public interface TableEnvironment {
 	String[] getCompletionHints(String statement, int position);
 
 	/**
-	 * Evaluates single sql statement including DDLs and DMLs.
-	 *
-	 * <p>Note: Always use this interface to execute a sql query. It only supports
-	 * to execute one sql statement a time.
-	 *
-	 * <p>A DDL statement can execute to create/drop a table/view:
-	 * For example, the below DDL statement would create a CSV table named `tbl1`
-	 * into the current catalog:
-	 * <blockquote><pre>
-	 *   create table tbl1(
-	 *     a int,
-	 *     b bigint,
-	 *     c varchar
-	 *   ) with (
-	 *     connector = 'csv',
-	 *     csv.path = 'xxx'
-	 *   )
-	 * </pre></blockquote>
-	 *
-	 * <p>The returns table format for different kind of statement:
-	 * <ul>
-	 *   <li>DDL: returns null.</li>
-	 *   <li>DML: a sql insert returns null; a sql query(select) returns
-	 *   a table to describe the query data set, it can be further queried through
-	 *   the Table API, or directly write to sink with
-	 *   {@link #insertInto(Table, String, String...)}.</li>
-	 * </ul>
-	 *
-	 * <p>SQL queries can directly execute as follows:
-	 *
-	 * <blockquote><pre>
-	 *   String sinkDDL = "create table sinkTable(
-	 *                       a int,
-	 *                       b varchar
-	 *                     ) with (
-	 *                       connector = 'csv',
-	 *                       csv.path = 'xxx'
-	 *                     )";
-	 *
-	 *  String sourceDDL ="create table sourceTable(
-	 *                       a int,
-	 *                       b varchar
-	 *                     ) with (
-	 *                       connector = 'kafka',
-	 *                       kafka.topic = 'xxx',
-	 *                       kafka.endpoint = 'x.x.x'
-	 *                     )";
-	 *
-	 *  String query = "INSERT INTO sinkTable SELECT * FROM sourceTable";
-	 *
-	 *  tEnv.sql(sourceDDL);
-	 *  tEnv.sql(sinkDDL);
-	 *  tEnv.sql(query);
-	 *  tEnv.execute("MyJob");
-	 * </pre></blockquote>
-	 * This code snippet creates a job to read data from Kafka source into a CSV sink.
-	 *
-	 * @param statement The SQL statement to evaluate.
-	 */
-	@Nullable
-	Table sql(String statement);
-
-	/**
 	 * Evaluates a SQL query on registered tables and retrieves the result as a {@link Table}.
 	 *
 	 * <p>All tables referenced by the query must be registered in the TableEnvironment.
@@ -402,17 +337,14 @@ public interface TableEnvironment {
 	 * }
 	 * </pre>
 	 *
-	 * @deprecated Use {@link #sql(String)}.
-	 *
 	 * @param query The SQL query to evaluate.
 	 * @return The result of the query as Table
 	 */
-	@Deprecated
 	Table sqlQuery(String query);
 
 	/**
 	 * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
-	 * NOTE: Currently only SQL INSERT statements are supported.
+	 * NOTE: Currently only SQL INSERT statements and CREATE TABLE statements are supported.
 	 *
 	 * <p>All tables referenced by the query must be registered in the TableEnvironment.
 	 * A {@link Table} is automatically registered when its {@link Table#toString()} method is
@@ -430,11 +362,56 @@ public interface TableEnvironment {
 	 * }
 	 * </pre>
 	 *
-	 * @deprecated Use {@link #sql(String)}.
+	 * <p>A DDL statement can also be executed to create a table:
+	 * For example, the below DDL statement would create a CSV table named `tbl1`
+	 * into the current catalog:
+	 * <blockquote><pre>
+	 *    create table tbl1(
+	 *      a int,
+	 *      b bigint,
+	 *      c varchar
+	 *    ) with (
+	 *      connector.type = 'filesystem',
+	 *      format.type = 'csv',
+	 *      connector.path = 'xxx'
+	 *    )
+	 * </pre></blockquote>
+	 *
+	 * <p>SQL queries can directly execute as follows:
+	 *
+	 * <blockquote><pre>
+	 *    String sinkDDL = "create table sinkTable(
+	 *                        a int,
+	 *                        b varchar
+	 *                      ) with (
+	 *                        connector.type = 'filesystem',
+	 *                        format.type = 'csv',
+	 *                        connector.path = 'xxx'
+	 *                      )";
+	 *
+	 *    String sourceDDL ="create table sourceTable(
+	 *                        a int,
+	 *                        b varchar
+	 *                      ) with (
+	 *                        connector.type = 'kafka',
+	 *                        `update-mode` = 'append',
+	 *                        connector.topic = 'xxx',
+	 *                        connector.properties.0.key = 'k0',
+	 *                        connector.properties.0.value = 'v0',
+	 *                        ...
+	 *                      )";
+	 *
+	 *    String query = "INSERT INTO sinkTable SELECT * FROM sourceTable";
+	 *
+	 *    tEnv.sqlUpdate(sourceDDL);
+	 *    tEnv.sqlUpdate(sinkDDL);
+	 *    tEnv.sqlUpdate(query);
+	 *    tEnv.execute("MyJob");
+	 * </pre></blockquote>
+	 * This code snippet creates a job to read data from Kafka source into a CSV sink.
 	 *
 	 * @param stmt The SQL statement to evaluate.
 	 */
-	@Deprecated
 	void sqlUpdate(String stmt);
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 34685c0..9f98d2b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -61,7 +61,6 @@ import org.apache.flink.table.operations.utils.OperationTreeBuilder;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.ArrayList;
@@ -295,40 +294,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	}
 
 	@Override
-	public Table sql(String statement) {
-		List<Operation> operations = planner.parse(statement);
-		Preconditions.checkState(operations.size() == 1,
-			"sql() only accepts a single SQL statement a time.");
-		Operation operation = operations.get(0);
-		if (operation instanceof CreateTableOperation) {
-			CreateTableOperation createTableOperation = (CreateTableOperation) operation;
-			registerCatalogTableInternal(
-				createTableOperation.getTablePath(),
-				createTableOperation.getCatalogTable(),
-				createTableOperation.isIgnoreIfExists());
-			// returns null for DDL statement now.
-			return null;
-		} else if (operation instanceof ModifyOperation) {
-			List<ModifyOperation> modifyOperations =
-				Collections.singletonList((ModifyOperation) operation);
-			if (isEagerOperationTranslation()) {
-				translate(modifyOperations);
-			} else {
-				buffer(modifyOperations);
-			}
-			// returns null for SQL INSERT statement now.
-			return null;
-		} else if (operation instanceof QueryOperation){
-			return createTable((QueryOperation) operation);
-		} else {
-			throw new ValidationException(
-				"Unsupported SQL query! sqlQuery() only accepts a single SQL query of type " +
-					"SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY; or DDL of type " +
-					"CREATE TABLE, CREATE VIEW; or DML of type INSERT INTO.");
-		}
-	}
-
-	@Override
 	public Table sqlQuery(String query) {
 		List<Operation> operations = planner.parse(query);
 
@@ -371,7 +336,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 		if (operations.size() != 1) {
 			throw new TableException(
-				"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type INSERT.");
+				"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
+					"INSERT or CREATE TABLE");
 		}
 
 		Operation operation = operations.get(0);
@@ -383,6 +349,12 @@ public class TableEnvironmentImpl implements TableEnvironment {
 			} else {
 				buffer(modifyOperations);
 			}
+		} else if (operation instanceof CreateTableOperation) {
+			CreateTableOperation createTableOperation = (CreateTableOperation) operation;
+			registerCatalogTableInternal(
+				createTableOperation.getTablePath(),
+				createTableOperation.getCatalogTable(),
+				createTableOperation.isIgnoreIfExists());
 		} else {
 			throw new TableException(
 				"Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of type INSERT.");
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 3507c81..75f9fa0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -414,45 +414,6 @@ abstract class TableEnvImpl(
     planner.getCompletionHints(statement, position)
   }
 
-  override def sql(statement: String): Table = {
-    val planner = getFlinkPlanner
-    // parse the sql query
-    val parsed = planner.parse(statement)
-    if (null != parsed) {
-      parsed match {
-        case insert: SqlInsert =>
-          val query = insert.getSource
-          val tableOperation = SqlToOperationConverter
-            .convert(planner, query)
-            .asInstanceOf[QueryOperation]
-          // get query result as Table
-          val queryResult = createTable(tableOperation)
-
-          // get name of sink table
-          val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
-
-          // insert query result into sink table
-          insertInto(queryResult, targetTablePath.asScala:_*)
-          // returns null for SQL INSERT statement now
-          null
-        case createTable: SqlCreateTable =>
-          val operation = SqlToOperationConverter
-            .convert(planner, createTable)
-            .asInstanceOf[CreateTableOperation]
-          registerCatalogTableInternal(operation.getTablePath,
-            operation.getCatalogTable,
-            operation.isIgnoreIfExists)
-          // returns null for DDL statement now
-          null
-        case query: SqlNode if query.getKind.belongsTo(SqlKind.QUERY) =>
-          createTable(SqlToOperationConverter.convert(planner, query)
-            .asInstanceOf[PlannerQueryOperation])
-      }
-    } else {
-      throw new TableException("Unsupported SQL query!")
-    }
-  }
-
   override def sqlQuery(query: String): Table = {
     val planner = getFlinkPlanner
     // parse the sql query
@@ -489,6 +450,13 @@ abstract class TableEnvImpl(
 
         // insert query result into sink table
         insertInto(queryResult, targetTablePath.asScala:_*)
+      case createTable: SqlCreateTable =>
+        val operation = SqlToOperationConverter
+          .convert(planner, createTable)
+          .asInstanceOf[CreateTableOperation]
+        registerCatalogTableInternal(operation.getTablePath,
+          operation.getCatalogTable,
+          operation.isIgnoreIfExists)
       case _ =>
         throw new TableException(
           "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index fc97071..28983c5 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -131,9 +131,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |insert into t2
         |select t1.a, t1.b, (t1.a + 1) as c from t1
       """.stripMargin
-    tableEnv.sql(sourceDDL)
-    tableEnv.sql(sinkDDL)
-    tableEnv.sql(query)
+    tableEnv.sqlUpdate(sourceDDL)
+    tableEnv.sqlUpdate(sinkDDL)
+    tableEnv.sqlUpdate(query)
     execJob("testJob")
     assertEquals(sourceData.sorted, TestCollectionTableFactory.RESULT.sorted)
   }
@@ -167,9 +167,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |insert into t2(a, b)
         |select t1.a, t1.b from t1
       """.stripMargin
-    tableEnv.sql(sourceDDL)
-    tableEnv.sql(sinkDDL)
-    tableEnv.sql(query)
+    tableEnv.sqlUpdate(sourceDDL)
+    tableEnv.sqlUpdate(sinkDDL)
+    tableEnv.sqlUpdate(query)
     execJob("testJob")
     assertEquals(SOURCE_DATA.sorted, TestCollectionTableFactory.RESULT.sorted)
   }
@@ -220,9 +220,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  join t1 b
         |  on a.a = b.b
       """.stripMargin
-    tableEnv.sql(sourceDDL)
-    tableEnv.sql(sinkDDL)
-    tableEnv.sql(query)
+    tableEnv.sqlUpdate(sourceDDL)
+    tableEnv.sqlUpdate(sinkDDL)
+    tableEnv.sqlUpdate(query)
     execJob("testJob")
     assertEquals(expected.sorted, TestCollectionTableFactory.RESULT.sorted)
   }
@@ -270,9 +270,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |insert into t2
         |select sum(a), t1.b from t1 group by t1.b
       """.stripMargin
-    tableEnv.sql(sourceDDL)
-    tableEnv.sql(sinkDDL)
-    tableEnv.sql(query)
+    tableEnv.sqlUpdate(sourceDDL)
+    tableEnv.sqlUpdate(sinkDDL)
+    tableEnv.sqlUpdate(query)
     execJob("testJob")
     assertEquals(expected.sorted, TestCollectionTableFactory.RESULT.sorted)
   }
@@ -311,9 +311,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |select sum(a), sum(b) from t1 group by TUMBLE(c, INTERVAL '1' SECOND)
       """.stripMargin
 
-    tableEnv.sql(sourceDDL)
-    tableEnv.sql(sinkDDL)
-    tableEnv.sql(query)
+    tableEnv.sqlUpdate(sourceDDL)
+    tableEnv.sqlUpdate(sinkDDL)
+    tableEnv.sqlUpdate(query)
     execJob("testJob")
     assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
   }
@@ -352,9 +352,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |select sum(a), sum(b) from t1 group by TUMBLE(wm, INTERVAL '1' SECOND)
       """.stripMargin
 
-    tableEnv.sql(sourceDDL)
-    tableEnv.sql(sinkDDL)
-    tableEnv.sql(query)
+    tableEnv.sqlUpdate(sourceDDL)
+    tableEnv.sqlUpdate(sinkDDL)
+    tableEnv.sqlUpdate(query)
     execJob("testJob")
     assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
   }
@@ -393,9 +393,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |select sum(a), sum(b) from t1 group by TUMBLE(c, INTERVAL '1' SECOND)
       """.stripMargin
 
-    tableEnv.sql(sourceDDL)
-    tableEnv.sql(sinkDDL)
-    tableEnv.sql(query)
+    tableEnv.sqlUpdate(sourceDDL)
+    tableEnv.sqlUpdate(sinkDDL)
+    tableEnv.sqlUpdate(query)
     execJob("testJob")
     assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
   }
@@ -434,9 +434,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |select sum(a), sum(b) from t1 group by TUMBLE(wm, INTERVAL '1' SECOND)
       """.stripMargin
 
-    tableEnv.sql(sourceDDL)
-    tableEnv.sql(sinkDDL)
-    tableEnv.sql(query)
+    tableEnv.sqlUpdate(sourceDDL)
+    tableEnv.sqlUpdate(sinkDDL)
+    tableEnv.sqlUpdate(query)
     execJob("testJob")
     assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index af72842..cb4567f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -96,6 +96,4 @@ class MockTableEnvironment extends TableEnvironment {
     sinkPathContinued: String*): Unit = ???
 
   override def execute(jobName: String): JobExecutionResult = ???
-
-  override def sql(statement: String): Table = ???
 }