You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 13:32:12 UTC
[flink] branch master updated: [FLINK-13209][table-api] Revert
TableEnvironment#sql and move ddl support to sqlUpdate
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b0bf838 [FLINK-13209][table-api] Revert TableEnvironment#sql and move ddl support to sqlUpdate
b0bf838 is described below
commit b0bf8383330d9c5d8f824c438a4a4cf1044f25ba
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Thu Jul 11 14:43:33 2019 +0800
[FLINK-13209][table-api] Revert TableEnvironment#sql and move ddl support to sqlUpdate
This closes #9081
---
flink-python/pyflink/table/table_environment.py | 82 +++++++++-----
.../table/tests/test_environment_completeness.py | 3 +-
.../table/tests/test_table_environment_api.py | 4 +-
.../apache/flink/table/api/TableEnvironment.java | 119 +++++++++------------
.../table/api/internal/TableEnvironmentImpl.java | 44 ++------
.../flink/table/api/internal/TableEnvImpl.scala | 46 ++------
.../flink/table/catalog/CatalogTableITCase.scala | 48 ++++-----
.../flink/table/utils/MockTableEnvironment.scala | 2 -
8 files changed, 145 insertions(+), 203 deletions(-)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index f3f6497..5def53a 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -234,14 +234,49 @@ class TableEnvironment(object):
"""
return self._j_tenv.explain(table._j_table)
- def sql(self, query):
+ def sql_query(self, query):
"""
- Evaluates single sql statement including DDLs and DMLs.
+ Evaluates a SQL query on registered tables and retrieves the result as a :class:`Table`.
- Note: Always use this interface to execute a sql query. It only supports
- to execute one sql statement a time.
+ All tables referenced by the query must be registered in the TableEnvironment.
- A DDL statement can execute to create/drop a table/view:
+ A :class:`Table` is automatically registered when its :func:`~Table.__str__` method is
+ called, for example when it is embedded into a String.
+
+ Hence, SQL queries can directly reference a :class:`Table` as follows:
+ ::
+
+ >>> table = ...
+ # the table is not registered to the table environment
+ >>> table_env.sql_query("SELECT * FROM %s" % table)
+
+ :param query: The sql query string.
+ :return: The result :class:`Table`.
+ """
+ j_table = self._j_tenv.sqlQuery(query)
+ return Table(j_table)
+
+ def sql_update(self, stmt):
+ """
+ Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement
+
+ .. note::
+
+ Currently only SQL INSERT statements and CREATE TABLE statements are supported.
+
+ All tables referenced by the query must be registered in the TableEnvironment.
+ A :class:`Table` is automatically registered when its :func:`~Table.__str__` method is
+ called, for example when it is embedded into a String.
+ Hence, SQL queries can directly reference a :class:`Table` as follows:
+ ::
+
+ # register the table sink into which the result is inserted.
+ >>> table_env.register_table_sink("sink_table", table_sink)
+ >>> source_table = ...
+ # source_table is not registered to the table environment
+ >>> table_env.sql_update("INSERT INTO sink_table SELECT * FROM %s" % source_table)
+
+ A DDL statement can also be executed to create/drop a table:
For example, the below DDL statement would create a CSV table named `tbl1`
into the current catalog::
@@ -250,18 +285,11 @@ class TableEnvironment(object):
b bigint,
c varchar
) with (
- connector = 'csv',
- csv.path = 'xxx'
+ connector.type = 'filesystem',
+ format.type = 'csv',
+ connector.path = 'xxx'
)
- The returns table format for different kind of statement:
-
- DDL: returns None.
-
- DML: a sql insert returns None; a sql query(select) returns a table
- to describe the query data set, it can be further queried through the Table API,
- or directly write to sink with :func:`Table.insert_into`.
-
SQL queries can directly execute as follows:
::
@@ -271,9 +299,11 @@ class TableEnvironment(object):
... a int,
... b varchar
... ) with (
- ... connector = 'kafka',
- ... kafka.topic = 'xxx',
- ... kafka.endpoint = 'x.x.x'
+ ... connector.type = 'kafka',
+ ... `update-mode` = 'append',
+ ... connector.topic = 'xxx',
+ ... connector.properties.0.key = 'k0',
+ ... connector.properties.0.value = 'v0'
... )
... '''
@@ -283,8 +313,9 @@ class TableEnvironment(object):
... a int,
... b varchar
... ) with (
- ... connector = 'csv',
- ... csv.path = 'xxx'
+ ... connector.type = 'filesystem',
+ ... format.type = 'csv',
+ ... connector.path = 'xxx'
... )
... '''
@@ -294,14 +325,11 @@ class TableEnvironment(object):
>>> table_env.sql(query)
>>> table_env.execute("MyJob")
- This code snippet creates a job to read data from Kafka source into a CSV sink.
-
- :param query: The SQL statement to evaluate.
+ :param stmt: The SQL statement to evaluate.
+ :param query_config: The :class:`QueryConfig` to use.
"""
- j_table = self._j_tenv.sql(query)
- if j_table is None:
- return None
- return Table(j_table)
+ # type: (str) -> None
+ self._j_tenv.sqlUpdate(stmt)
def get_current_catalog(self):
"""
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index fd95fc3..1f82445 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -41,11 +41,10 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
# registerExternalCatalog, getRegisteredExternalCatalog, registerCatalog, getCatalog and
# listTables should be supported when catalog supported in python.
# getCompletionHints has been deprecated. It will be removed in the next release.
- # sqlQuery and sqlUpdate has been deprecated. It will be removed in the next release.
# TODO add TableEnvironment#create method with EnvironmentSettings as a parameter
return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'registerCatalog',
'getCatalog', 'registerFunction', 'listUserDefinedFunctions', 'listTables',
- 'getCompletionHints', 'create', 'sqlQuery', 'sqlUpdate'}
+ 'getCompletionHints', 'create'}
if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index f85d6af..3c7e0f7 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -112,7 +112,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
"sinks",
source_sink_utils.TestAppendSink(field_names, field_types))
- result = t_env.sql("select a + 1, b, c from %s" % source)
+ result = t_env.sql_query("select a + 1, b, c from %s" % source)
result.insert_into("sinks")
self.env.execute()
actual = source_sink_utils.results()
@@ -129,7 +129,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
"sinks",
source_sink_utils.TestAppendSink(field_names, field_types))
- t_env.sql("insert into sinks select * from %s" % source)
+ t_env.sql_update("insert into sinks select * from %s" % source)
self.env.execute("test_sql_job")
actual = source_sink_utils.results()
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 535a6bc..c396c53 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -30,8 +30,6 @@ import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
-import javax.annotation.Nullable;
-
import java.util.Optional;
/**
@@ -323,69 +321,6 @@ public interface TableEnvironment {
String[] getCompletionHints(String statement, int position);
/**
- * Evaluates single sql statement including DDLs and DMLs.
- *
- * <p>Note: Always use this interface to execute a sql query. It only supports
- * to execute one sql statement a time.
- *
- * <p>A DDL statement can execute to create/drop a table/view:
- * For example, the below DDL statement would create a CSV table named `tbl1`
- * into the current catalog:
- * <blockquote><pre>
- * create table tbl1(
- * a int,
- * b bigint,
- * c varchar
- * ) with (
- * connector = 'csv',
- * csv.path = 'xxx'
- * )
- * </pre></blockquote>
- *
- * <p>The returns table format for different kind of statement:
- * <ul>
- * <li>DDL: returns null.</li>
- * <li>DML: a sql insert returns null; a sql query(select) returns
- * a table to describe the query data set, it can be further queried through
- * the Table API, or directly write to sink with
- * {@link #insertInto(Table, String, String...)}.</li>
- * </ul>
- *
- * <p>SQL queries can directly execute as follows:
- *
- * <blockquote><pre>
- * String sinkDDL = "create table sinkTable(
- * a int,
- * b varchar
- * ) with (
- * connector = 'csv',
- * csv.path = 'xxx'
- * )";
- *
- * String sourceDDL ="create table sourceTable(
- * a int,
- * b varchar
- * ) with (
- * connector = 'kafka',
- * kafka.topic = 'xxx',
- * kafka.endpoint = 'x.x.x'
- * )";
- *
- * String query = "INSERT INTO sinkTable SELECT * FROM sourceTable";
- *
- * tEnv.sql(sourceDDL);
- * tEnv.sql(sinkDDL);
- * tEnv.sql(query);
- * tEnv.execute("MyJob");
- * </pre></blockquote>
- * This code snippet creates a job to read data from Kafka source into a CSV sink.
- *
- * @param statement The SQL statement to evaluate.
- */
- @Nullable
- Table sql(String statement);
-
- /**
* Evaluates a SQL query on registered tables and retrieves the result as a {@link Table}.
*
* <p>All tables referenced by the query must be registered in the TableEnvironment.
@@ -402,17 +337,14 @@ public interface TableEnvironment {
* }
* </pre>
*
- * @deprecated Use {@link #sql(String)}.
- *
* @param query The SQL query to evaluate.
* @return The result of the query as Table
*/
- @Deprecated
Table sqlQuery(String query);
/**
* Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
- * NOTE: Currently only SQL INSERT statements are supported.
+ * NOTE: Currently only SQL INSERT statements and CREATE TABLE statements are supported.
*
* <p>All tables referenced by the query must be registered in the TableEnvironment.
* A {@link Table} is automatically registered when its {@link Table#toString()} method is
@@ -430,11 +362,56 @@ public interface TableEnvironment {
* }
* </pre>
*
- * @deprecated Use {@link #sql(String)}.
+ * <p>A DDL statement can also be executed to create a table:
+ * For example, the below DDL statement would create a CSV table named `tbl1`
+ * into the current catalog:
+ * <blockquote><pre>
+ * create table tbl1(
+ * a int,
+ * b bigint,
+ * c varchar
+ * ) with (
+ * connector.type = 'filesystem',
+ * format.type = 'csv',
+ * connector.path = 'xxx'
+ * )
+ * </pre></blockquote>
+ *
+ * <p>SQL queries can directly execute as follows:
+ *
+ * <blockquote><pre>
+ * String sinkDDL = "create table sinkTable(
+ * a int,
+ * b varchar
+ * ) with (
+ * connector.type = 'filesystem',
+ * format.type = 'csv',
+ * connector.path = 'xxx'
+ * )";
+ *
+ * String sourceDDL ="create table sourceTable(
+ * a int,
+ * b varchar
+ * ) with (
+ * connector.type = 'kafka',
+ * `update-mode` = 'append',
+ * connector.topic = 'xxx',
+ * connector.properties.0.key = 'k0',
+ * connector.properties.0.value = 'v0',
+ * ...
+ * )";
+ *
+ * String query = "INSERT INTO sinkTable SELECT * FROM sourceTable";
+ *
+ * tEnv.sqlUpdate(sourceDDL);
+ * tEnv.sqlUpdate(sinkDDL);
+ * tEnv.sqlUpdate(query);
+ * tEnv.execute("MyJob");
+ * </pre></blockquote>
+ * This code snippet creates a job to read data from Kafka source into a CSV sink.
*
* @param stmt The SQL statement to evaluate.
*/
- @Deprecated
void sqlUpdate(String stmt);
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 34685c0..9f98d2b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -61,7 +61,6 @@ import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceValidation;
-import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.ArrayList;
@@ -295,40 +294,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
}
@Override
- public Table sql(String statement) {
- List<Operation> operations = planner.parse(statement);
- Preconditions.checkState(operations.size() == 1,
- "sql() only accepts a single SQL statement a time.");
- Operation operation = operations.get(0);
- if (operation instanceof CreateTableOperation) {
- CreateTableOperation createTableOperation = (CreateTableOperation) operation;
- registerCatalogTableInternal(
- createTableOperation.getTablePath(),
- createTableOperation.getCatalogTable(),
- createTableOperation.isIgnoreIfExists());
- // returns null for DDL statement now.
- return null;
- } else if (operation instanceof ModifyOperation) {
- List<ModifyOperation> modifyOperations =
- Collections.singletonList((ModifyOperation) operation);
- if (isEagerOperationTranslation()) {
- translate(modifyOperations);
- } else {
- buffer(modifyOperations);
- }
- // returns null for SQL INSERT statement now.
- return null;
- } else if (operation instanceof QueryOperation){
- return createTable((QueryOperation) operation);
- } else {
- throw new ValidationException(
- "Unsupported SQL query! sqlQuery() only accepts a single SQL query of type " +
- "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY; or DDL of type " +
- "CREATE TABLE, CREATE VIEW; or DML of type INSERT INTO.");
- }
- }
-
- @Override
public Table sqlQuery(String query) {
List<Operation> operations = planner.parse(query);
@@ -371,7 +336,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
if (operations.size() != 1) {
throw new TableException(
- "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type INSERT.");
+ "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
+ "INSERT or CREATE TABLE");
}
Operation operation = operations.get(0);
@@ -383,6 +349,12 @@ public class TableEnvironmentImpl implements TableEnvironment {
} else {
buffer(modifyOperations);
}
+ } else if (operation instanceof CreateTableOperation) {
+ CreateTableOperation createTableOperation = (CreateTableOperation) operation;
+ registerCatalogTableInternal(
+ createTableOperation.getTablePath(),
+ createTableOperation.getCatalogTable(),
+ createTableOperation.isIgnoreIfExists());
} else {
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of type INSERT.");
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 3507c81..75f9fa0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -414,45 +414,6 @@ abstract class TableEnvImpl(
planner.getCompletionHints(statement, position)
}
- override def sql(statement: String): Table = {
- val planner = getFlinkPlanner
- // parse the sql query
- val parsed = planner.parse(statement)
- if (null != parsed) {
- parsed match {
- case insert: SqlInsert =>
- val query = insert.getSource
- val tableOperation = SqlToOperationConverter
- .convert(planner, query)
- .asInstanceOf[QueryOperation]
- // get query result as Table
- val queryResult = createTable(tableOperation)
-
- // get name of sink table
- val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
-
- // insert query result into sink table
- insertInto(queryResult, targetTablePath.asScala:_*)
- // returns null for SQL INSERT statement now
- null
- case createTable: SqlCreateTable =>
- val operation = SqlToOperationConverter
- .convert(planner, createTable)
- .asInstanceOf[CreateTableOperation]
- registerCatalogTableInternal(operation.getTablePath,
- operation.getCatalogTable,
- operation.isIgnoreIfExists)
- // returns null for DDL statement now
- null
- case query: SqlNode if query.getKind.belongsTo(SqlKind.QUERY) =>
- createTable(SqlToOperationConverter.convert(planner, query)
- .asInstanceOf[PlannerQueryOperation])
- }
- } else {
- throw new TableException("Unsupported SQL query!")
- }
- }
-
override def sqlQuery(query: String): Table = {
val planner = getFlinkPlanner
// parse the sql query
@@ -489,6 +450,13 @@ abstract class TableEnvImpl(
// insert query result into sink table
insertInto(queryResult, targetTablePath.asScala:_*)
+ case createTable: SqlCreateTable =>
+ val operation = SqlToOperationConverter
+ .convert(planner, createTable)
+ .asInstanceOf[CreateTableOperation]
+ registerCatalogTableInternal(operation.getTablePath,
+ operation.getCatalogTable,
+ operation.isIgnoreIfExists)
case _ =>
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index fc97071..28983c5 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -131,9 +131,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
|insert into t2
|select t1.a, t1.b, (t1.a + 1) as c from t1
""".stripMargin
- tableEnv.sql(sourceDDL)
- tableEnv.sql(sinkDDL)
- tableEnv.sql(query)
+ tableEnv.sqlUpdate(sourceDDL)
+ tableEnv.sqlUpdate(sinkDDL)
+ tableEnv.sqlUpdate(query)
execJob("testJob")
assertEquals(sourceData.sorted, TestCollectionTableFactory.RESULT.sorted)
}
@@ -167,9 +167,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
|insert into t2(a, b)
|select t1.a, t1.b from t1
""".stripMargin
- tableEnv.sql(sourceDDL)
- tableEnv.sql(sinkDDL)
- tableEnv.sql(query)
+ tableEnv.sqlUpdate(sourceDDL)
+ tableEnv.sqlUpdate(sinkDDL)
+ tableEnv.sqlUpdate(query)
execJob("testJob")
assertEquals(SOURCE_DATA.sorted, TestCollectionTableFactory.RESULT.sorted)
}
@@ -220,9 +220,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
| join t1 b
| on a.a = b.b
""".stripMargin
- tableEnv.sql(sourceDDL)
- tableEnv.sql(sinkDDL)
- tableEnv.sql(query)
+ tableEnv.sqlUpdate(sourceDDL)
+ tableEnv.sqlUpdate(sinkDDL)
+ tableEnv.sqlUpdate(query)
execJob("testJob")
assertEquals(expected.sorted, TestCollectionTableFactory.RESULT.sorted)
}
@@ -270,9 +270,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
|insert into t2
|select sum(a), t1.b from t1 group by t1.b
""".stripMargin
- tableEnv.sql(sourceDDL)
- tableEnv.sql(sinkDDL)
- tableEnv.sql(query)
+ tableEnv.sqlUpdate(sourceDDL)
+ tableEnv.sqlUpdate(sinkDDL)
+ tableEnv.sqlUpdate(query)
execJob("testJob")
assertEquals(expected.sorted, TestCollectionTableFactory.RESULT.sorted)
}
@@ -311,9 +311,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
|select sum(a), sum(b) from t1 group by TUMBLE(c, INTERVAL '1' SECOND)
""".stripMargin
- tableEnv.sql(sourceDDL)
- tableEnv.sql(sinkDDL)
- tableEnv.sql(query)
+ tableEnv.sqlUpdate(sourceDDL)
+ tableEnv.sqlUpdate(sinkDDL)
+ tableEnv.sqlUpdate(query)
execJob("testJob")
assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
}
@@ -352,9 +352,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
|select sum(a), sum(b) from t1 group by TUMBLE(wm, INTERVAL '1' SECOND)
""".stripMargin
- tableEnv.sql(sourceDDL)
- tableEnv.sql(sinkDDL)
- tableEnv.sql(query)
+ tableEnv.sqlUpdate(sourceDDL)
+ tableEnv.sqlUpdate(sinkDDL)
+ tableEnv.sqlUpdate(query)
execJob("testJob")
assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
}
@@ -393,9 +393,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
|select sum(a), sum(b) from t1 group by TUMBLE(c, INTERVAL '1' SECOND)
""".stripMargin
- tableEnv.sql(sourceDDL)
- tableEnv.sql(sinkDDL)
- tableEnv.sql(query)
+ tableEnv.sqlUpdate(sourceDDL)
+ tableEnv.sqlUpdate(sinkDDL)
+ tableEnv.sqlUpdate(query)
execJob("testJob")
assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
}
@@ -434,9 +434,9 @@ class CatalogTableITCase(isStreaming: Boolean) {
|select sum(a), sum(b) from t1 group by TUMBLE(wm, INTERVAL '1' SECOND)
""".stripMargin
- tableEnv.sql(sourceDDL)
- tableEnv.sql(sinkDDL)
- tableEnv.sql(query)
+ tableEnv.sqlUpdate(sourceDDL)
+ tableEnv.sqlUpdate(sinkDDL)
+ tableEnv.sqlUpdate(query)
execJob("testJob")
assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index af72842..cb4567f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -96,6 +96,4 @@ class MockTableEnvironment extends TableEnvironment {
sinkPathContinued: String*): Unit = ???
override def execute(jobName: String): JobExecutionResult = ???
-
- override def sql(statement: String): Table = ???
}