You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/06/11 15:25:33 UTC

[flink] branch release-1.11 updated: [FLINK-16375][table] Reintroduce registerTable[Source/Sink] methods for table env

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new c727999  [FLINK-16375][table] Reintroduce registerTable[Source/Sink] methods for table env
c727999 is described below

commit c72799933a0d5ac02dfb9437bafa1b5ef38619f9
Author: Kurt Young <ku...@apache.org>
AuthorDate: Thu Jun 11 23:23:09 2020 +0800

    [FLINK-16375][table] Reintroduce registerTable[Source/Sink] methods for table env
    
    Since we don't have a good alternative solution for easily register user defined source/sink/factory, we need these methods back, for now.
    
    This closes #12603
---
 .../apache/flink/table/api/TableEnvironment.java   | 50 ++++++++++++++++++++++
 .../table/api/internal/TableEnvironmentImpl.java   | 27 ++++++++++++
 .../planner/plan/stream/sql/LegacySinkTest.scala   | 18 ++++----
 .../plan/stream/table/LegacyTableSourceTest.scala  | 24 +++++------
 .../flink/table/api/internal/TableEnvImpl.scala    | 38 ++++++++++++++++
 .../table/api/stream/table/TableSourceTest.scala   | 24 +++++------
 .../runtime/batch/sql/TableEnvironmentITCase.scala | 22 +++++-----
 .../flink/table/utils/MockTableEnvironment.scala   | 12 +++++-
 8 files changed, 170 insertions(+), 45 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index d5f5ce0..614b6e7 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.api;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.descriptors.ConnectTableDescriptor;
@@ -546,6 +547,55 @@ public interface TableEnvironment {
 	void createTemporaryView(String path, Table view);
 
 	/**
+	 * Registers an external {@link TableSource} in this {@link TableEnvironment}'s catalog.
+	 * Registered tables can be referenced in SQL queries.
+	 *
+	 * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
+	 * be inaccessible in the current session. To make the permanent object available again one can drop the
+	 * corresponding temporary object.
+	 *
+	 * @param name        The name under which the {@link TableSource} is registered.
+	 * @param tableSource The {@link TableSource} to register.
+	 * @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
+	 */
+	@Deprecated
+	void registerTableSource(String name, TableSource<?> tableSource);
+
+	/**
+	 * Registers an external {@link TableSink} with given field names and types in this
+	 * {@link TableEnvironment}'s catalog.
+	 * Registered sink tables can be referenced in SQL DML statements.
+	 *
+	 * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
+	 * be inaccessible in the current session. To make the permanent object available again one can drop the
+	 * corresponding temporary object.
+	 *
+	 * @param name The name under which the {@link TableSink} is registered.
+	 * @param fieldNames The field names to register with the {@link TableSink}.
+	 * @param fieldTypes The field types to register with the {@link TableSink}.
+	 * @param tableSink The {@link TableSink} to register.
+	 * @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
+	 */
+	@Deprecated
+	void registerTableSink(String name, String[] fieldNames, TypeInformation<?>[] fieldTypes, TableSink<?> tableSink);
+
+	/**
+	 * Registers an external {@link TableSink} with already configured field names and field types in
+	 * this {@link TableEnvironment}'s catalog.
+	 * Registered sink tables can be referenced in SQL DML statements.
+	 *
+	 * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
+	 * be inaccessible in the current session. To make the permanent object available again one can drop the
+	 * corresponding temporary object.
+	 *
+	 * @param name The name under which the {@link TableSink} is registered.
+	 * @param configuredSink The configured {@link TableSink} to register.
+	 * @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
+	 */
+	@Deprecated
+	void registerTableSink(String name, TableSink<?> configuredSink);
+
+	/**
 	 * Scans a registered table and returns the resulting {@link Table}.
 	 *
 	 * <p>A table to scan must be registered in the {@link TableEnvironment}. It can be either directly
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 6edce4a..cd47f35 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.internal;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.core.execution.JobClient;
@@ -444,6 +445,32 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 	}
 
 	@Override
+	public void registerTableSource(String name, TableSource<?> tableSource) {
+		// only accept StreamTableSource and LookupableTableSource here
+		// TODO should add a validation, while StreamTableSource is in flink-table-api-java-bridge module now
+		registerTableSourceInternal(name, tableSource);
+	}
+
+	@Override
+	public void registerTableSink(
+			String name,
+			String[] fieldNames,
+			TypeInformation<?>[] fieldTypes,
+			TableSink<?> tableSink) {
+		registerTableSink(name, tableSink.configure(fieldNames, fieldTypes));
+	}
+
+	@Override
+	public void registerTableSink(String name, TableSink<?> configuredSink) {
+		// validate
+		if (configuredSink.getTableSchema().getFieldCount() == 0) {
+			throw new TableException("Table schema cannot be empty.");
+		}
+
+		registerTableSinkInternal(name, configuredSink);
+	}
+
+	@Override
 	public Table scan(String... tablePath) {
 		UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(tablePath);
 		return scanInternal(unresolvedIdentifier)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
index 599e4e9..50f83a9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
@@ -53,14 +53,14 @@ class LegacySinkTest extends TableTestBase {
     util.tableEnv.createTemporaryView("TempTable", table)
 
     val retractSink = util.createRetractTableSink(Array("cnt"), Array(LONG))
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    util.tableEnv.registerTableSink(
       "retractSink1", retractSink)
     stmtSet.addInsert("retractSink1", table)
 
     val table2 = util.tableEnv.sqlQuery(
       "SELECT cnt, SUM(cnt) OVER (ORDER BY PROCTIME()) FROM TempTable")
     val retractSink2 = util.createRetractTableSink(Array("cnt", "total"), Array(LONG, LONG))
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    util.tableEnv.registerTableSink(
       "retractSink2", retractSink2)
     stmtSet.addInsert("retractSink2", table2)
 
@@ -143,13 +143,13 @@ class LegacySinkTest extends TableTestBase {
 
     val table1 = util.tableEnv.sqlQuery("SELECT b, cnt FROM TempTable WHERE b < 4")
     val retractSink = util.createRetractTableSink(Array("b", "cnt"), Array(LONG, LONG))
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    util.tableEnv.registerTableSink(
       "retractSink", retractSink)
     stmtSet.addInsert("retractSink", table1)
 
     val table2 = util.tableEnv.sqlQuery("SELECT b, cnt FROM TempTable WHERE b >= 4 AND b < 6")
     val upsertSink = util.createUpsertTableSink(Array(), Array("b", "cnt"), Array(LONG, LONG))
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    util.tableEnv.registerTableSink(
       "upsertSink", upsertSink)
     stmtSet.addInsert("upsertSink", table2)
 
@@ -165,13 +165,13 @@ class LegacySinkTest extends TableTestBase {
     val table1 = util.tableEnv.sqlQuery(
       "SELECT cnt, COUNT(b) AS frequency FROM TempTable WHERE b < 4 GROUP BY cnt")
     val upsertSink1 = util.createUpsertTableSink(Array(0), Array("b", "cnt"), Array(LONG, LONG))
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    util.tableEnv.registerTableSink(
       "upsertSink1", upsertSink1)
     stmtSet.addInsert("upsertSink1", table1)
 
     val table2 = util.tableEnv.sqlQuery("SELECT b, cnt FROM TempTable WHERE b >= 4 AND b < 6")
     val upsertSink2 = util.createUpsertTableSink(Array(), Array("b", "cnt"), Array(LONG, LONG))
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    util.tableEnv.registerTableSink(
       "upsertSink2", upsertSink2)
     stmtSet.addInsert("upsertSink2", table2)
 
@@ -189,7 +189,7 @@ class LegacySinkTest extends TableTestBase {
     util.tableEnv.registerTable("TempTable", table)
 
     val appendSink = util.createAppendTableSink(Array("a", "b"), Array(INT, LONG))
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    util.tableEnv.registerTableSink(
       "appendSink", appendSink)
     stmtSet.addInsert("appendSink", table)
 
@@ -199,13 +199,13 @@ class LegacySinkTest extends TableTestBase {
 
     val table2 = util.tableEnv.sqlQuery("SELECT SUM(a) AS total_sum FROM TempTable1")
     val retractSink = util.createRetractTableSink(Array("total_sum"), Array(INT))
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    util.tableEnv.registerTableSink(
       "retractSink", retractSink)
     stmtSet.addInsert("retractSink", table2)
 
     val table3 = util.tableEnv.sqlQuery("SELECT MIN(a) AS total_min FROM TempTable1")
     val upsertSink = util.createUpsertTableSink(Array(), Array("total_min"), Array(INT))
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    util.tableEnv.registerTableSink(
       "upsertSink", upsertSink)
     stmtSet.addInsert("upsertSink", table3)
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala
index 72d2c11..fa35afe 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala
@@ -41,7 +41,7 @@ class LegacyTableSourceTest extends TableTestBase {
       Array("id", "rowtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "rowTimeT",
       new TestTableSourceWithTime[Row](
         false, tableSchema, returnType, Seq(), rowtime = "rowtime"))
@@ -62,7 +62,7 @@ class LegacyTableSourceTest extends TableTestBase {
       Array("id", "rowtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "rowTimeT",
       new TestTableSourceWithTime[Row](
         false, tableSchema, returnType, Seq(), rowtime = "rowtime"))
@@ -83,7 +83,7 @@ class LegacyTableSourceTest extends TableTestBase {
       Array("id", "rowtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "rowTimeT",
       new TestTableSourceWithTime[Row](
         false, tableSchema, returnType, Seq(), rowtime = "rowtime"))
@@ -107,7 +107,7 @@ class LegacyTableSourceTest extends TableTestBase {
       Array("id", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "procTimeT",
       new TestTableSourceWithTime[Row](
         false, tableSchema, returnType, Seq(), proctime = "proctime"))
@@ -127,7 +127,7 @@ class LegacyTableSourceTest extends TableTestBase {
       Array("id", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "procTimeT",
       new TestTableSourceWithTime[Row](
         false, tableSchema, returnType, Seq(), proctime = "proctime"))
@@ -150,7 +150,7 @@ class LegacyTableSourceTest extends TableTestBase {
       Array("id", "name", "val", "rtime"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestLegacyProjectableTableSource(
         false, tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -170,7 +170,7 @@ class LegacyTableSourceTest extends TableTestBase {
       Array("id", "name", "val", "rtime"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestLegacyProjectableTableSource(
         false, tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -190,7 +190,7 @@ class LegacyTableSourceTest extends TableTestBase {
       Array("id", "rtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestLegacyProjectableTableSource(
         false, tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -210,7 +210,7 @@ class LegacyTableSourceTest extends TableTestBase {
       Array("id", "rtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestLegacyProjectableTableSource(
         false, tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -230,7 +230,7 @@ class LegacyTableSourceTest extends TableTestBase {
       Array("id", "rtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestLegacyProjectableTableSource(
         false, tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -251,7 +251,7 @@ class LegacyTableSourceTest extends TableTestBase {
     val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name")
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestLegacyProjectableTableSource(
         false, tableSchema, returnType, Seq(), "rtime", "ptime", mapping))
@@ -287,7 +287,7 @@ class LegacyTableSourceTest extends TableTestBase {
         Array("id", "deepNested", "nested", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestNestedProjectableTableSource(
         false, tableSchema, returnType, Seq()))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 905a031..16f04e1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -322,6 +322,44 @@ abstract class TableEnvImpl(
       false)
   }
 
+  override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
+    validateTableSource(tableSource)
+    registerTableSourceInternal(name, tableSource)
+  }
+
+  override def registerTableSink(
+      name: String,
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]],
+      tableSink: TableSink[_]): Unit = {
+
+    if (fieldNames == null) {
+      throw new TableException("fieldNames must not be null.")
+    }
+    if (fieldTypes == null) {
+      throw new TableException("fieldTypes must not be null.")
+    }
+    if (fieldNames.length == 0) {
+      throw new TableException("fieldNames must not be empty.")
+    }
+    if (fieldNames.length != fieldTypes.length) {
+      throw new TableException("Same number of field names and types required.")
+    }
+
+    val configuredSink = tableSink.configure(fieldNames, fieldTypes)
+    registerTableSinkInternal(name, configuredSink)
+  }
+
+  override def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = {
+    // validate
+    if (configuredSink.getTableSchema.getFieldNames.length == 0) {
+      throw new TableException("Field names must not be empty.")
+    }
+
+    validateTableSink(configuredSink)
+    registerTableSinkInternal(name, configuredSink)
+  }
+
   override def fromTableSource(source: TableSource[_]): Table = {
     createTable(new TableSourceQueryOperation(source, isBatchTable))
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
index 8c2d426..75367d5 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
@@ -42,7 +42,7 @@ class TableSourceTest extends TableTestBase {
       Array("id", "rowtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "rowTimeT",
       new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime"))
 
@@ -66,7 +66,7 @@ class TableSourceTest extends TableTestBase {
       Array("id", "rowtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "rowTimeT",
       new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime"))
 
@@ -90,7 +90,7 @@ class TableSourceTest extends TableTestBase {
       Array("id", "rowtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "rowTimeT",
       new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime"))
 
@@ -133,7 +133,7 @@ class TableSourceTest extends TableTestBase {
       Array("id", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "procTimeT",
       new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), proctime = "proctime"))
 
@@ -161,7 +161,7 @@ class TableSourceTest extends TableTestBase {
       Array("id", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "procTimeT",
       new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), proctime = "proctime"))
 
@@ -200,7 +200,7 @@ class TableSourceTest extends TableTestBase {
       Array("id", "name", "val", "rtime"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
 
@@ -223,7 +223,7 @@ class TableSourceTest extends TableTestBase {
       Array("id", "name", "val", "rtime"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
 
@@ -249,7 +249,7 @@ class TableSourceTest extends TableTestBase {
       Array("id", "rtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
 
@@ -271,7 +271,7 @@ class TableSourceTest extends TableTestBase {
       Array("id", "rtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
 
@@ -293,7 +293,7 @@ class TableSourceTest extends TableTestBase {
       Array("id", "rtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
 
@@ -317,7 +317,7 @@ class TableSourceTest extends TableTestBase {
     val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name")
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime", mapping))
 
@@ -356,7 +356,7 @@ class TableSourceTest extends TableTestBase {
         Array("id", "deepNested", "nested", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+    util.tableEnv.registerTableSource(
       "T",
       new TestNestedProjectableTableSource(tableSchema, returnType, Seq()))
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
index c68a7c7..30db7ed 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -150,7 +150,7 @@ class TableEnvironmentITCase(
     val fieldNames = Array("d", "e", "f")
     val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
     val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    tEnv.registerTableSink(
       "targetTable", sink.configure(fieldNames, fieldTypes))
 
     val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
@@ -173,7 +173,7 @@ class TableEnvironmentITCase(
     val fieldNames = Array("d", "e", "f")
     val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
     val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    tEnv.registerTableSink(
       "targetTable", sink.configure(fieldNames, fieldTypes))
 
     val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
@@ -220,7 +220,7 @@ class TableEnvironmentITCase(
     val fieldNames = Array("d", "e", "f", "g")
     val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
     val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    tEnv.registerTableSink(
       "targetTable", sink.configure(fieldNames, fieldTypes))
 
     val sql = "INSERT INTO targetTable SELECT * FROM sourceTable where id > 7"
@@ -259,7 +259,7 @@ class TableEnvironmentITCase(
     val fieldNames = Array("d", "e", "f")
     val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
     val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    tEnv.registerTableSink(
       "targetTable", sink.configure(fieldNames, fieldTypes))
 
     val result = tEnv.sqlQuery("SELECT c, b, a FROM sourceTable").select('a.avg, 'b.sum, 'c.count)
@@ -294,7 +294,7 @@ class TableEnvironmentITCase(
     val fieldNames = Array("d", "e", "f")
     val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
     val sink1 = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    tEnv.registerTableSink(
       "targetTable", sink1.configure(fieldNames, fieldTypes))
 
     val tableResult = tEnv.executeSql("INSERT INTO targetTable SELECT a, b, c FROM sourceTable")
@@ -319,7 +319,7 @@ class TableEnvironmentITCase(
     val sinkPath = resultFile.getAbsolutePath
     val configuredSink = new TestingOverwritableTableSink(sinkPath)
       .configure(Array("d"),  Array(STRING))
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", configuredSink)
+    tEnv.registerTableSink("MySink", configuredSink)
 
     val tableResult1 = tEnv.executeSql("INSERT overwrite MySink SELECT c FROM sourceTable")
     checkInsertTableResult(tableResult1, "default_catalog.default_database.MySink")
@@ -354,7 +354,7 @@ class TableEnvironmentITCase(
     val fieldNames = Array("d", "e", "f")
     val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
     val sink1 = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    tEnv.registerTableSink(
       "targetTable", sink1.configure(fieldNames, fieldTypes))
 
     val sink1Path = registerCsvTableSink(tEnv, fieldNames, fieldTypes, "MySink1")
@@ -392,7 +392,7 @@ class TableEnvironmentITCase(
     val fieldNames = Array("d", "e", "f")
     val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
     val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    tEnv.registerTableSink(
       "targetTable", sink.configure(fieldNames, fieldTypes))
 
     val result = tEnv.sqlQuery("SELECT c, b, a FROM sourceTable").select('a.avg, 'b.sum, 'c.count)
@@ -431,7 +431,7 @@ class TableEnvironmentITCase(
     val fieldNames = Array("d", "e", "f")
     val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
     val sink1 = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+    tEnv.registerTableSink(
       "targetTable", sink1.configure(fieldNames, fieldTypes))
 
     val table = tEnv.sqlQuery("SELECT a, b, c FROM sourceTable")
@@ -457,7 +457,7 @@ class TableEnvironmentITCase(
     val sinkPath = resultFile.getAbsolutePath
     val configuredSink = new TestingOverwritableTableSink(sinkPath)
       .configure(Array("d"),  Array(STRING))
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", configuredSink)
+    tEnv.registerTableSink("MySink", configuredSink)
 
     val tableResult1 = tEnv.sqlQuery("SELECT c FROM sourceTable").executeInsert("MySink", true)
     checkInsertTableResult(tableResult1, "default_catalog.default_database.MySink")
@@ -543,7 +543,7 @@ class TableEnvironmentITCase(
     val sinkPath = _tempFolder.newFile().getAbsolutePath
     val configuredSink = new TestingOverwritableTableSink(sinkPath)
       .configure(Array("d", "e", "f"), Array(INT, LONG, STRING))
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", configuredSink)
+    tEnv.registerTableSink("MySink", configuredSink)
     assertTrue(FileUtils.readFileUtf8(new File(sinkPath)).isEmpty)
 
     val stmtSet = tEnv.createStatementSet()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 0f69ff3..d608765 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -20,14 +20,15 @@ package org.apache.flink.table.utils
 
 import java.lang.{Iterable => JIterable}
 import java.util.Optional
-
 import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{ExplainDetail, StatementSet, Table, TableConfig, TableEnvironment, TableResult}
 import org.apache.flink.table.catalog.Catalog
 import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction}
 import org.apache.flink.table.module.Module
+import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.types.AbstractDataType
 
@@ -39,6 +40,15 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def registerTable(name: String, table: Table): Unit = ???
 
+  override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = ???
+
+  override def registerTableSink(
+      name: String,
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]], tableSink: TableSink[_]): Unit = ???
+
+  override def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = ???
+
   override def scan(tablePath: String*): Table = ???
 
   override def connect(connectorDescriptor: ConnectorDescriptor): ConnectTableDescriptor = ???