You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2020/06/11 15:25:33 UTC
[flink] branch release-1.11 updated: [FLINK-16375][table]
Reintroduce registerTable[Source/Sink] methods for table env
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new c727999 [FLINK-16375][table] Reintroduce registerTable[Source/Sink] methods for table env
c727999 is described below
commit c72799933a0d5ac02dfb9437bafa1b5ef38619f9
Author: Kurt Young <ku...@apache.org>
AuthorDate: Thu Jun 11 23:23:09 2020 +0800
[FLINK-16375][table] Reintroduce registerTable[Source/Sink] methods for table env
Since we don't have a good alternative solution for easily register user defined source/sink/factory, we need these methods back, for now.
This closes #12603
---
.../apache/flink/table/api/TableEnvironment.java | 50 ++++++++++++++++++++++
.../table/api/internal/TableEnvironmentImpl.java | 27 ++++++++++++
.../planner/plan/stream/sql/LegacySinkTest.scala | 18 ++++----
.../plan/stream/table/LegacyTableSourceTest.scala | 24 +++++------
.../flink/table/api/internal/TableEnvImpl.scala | 38 ++++++++++++++++
.../table/api/stream/table/TableSourceTest.scala | 24 +++++------
.../runtime/batch/sql/TableEnvironmentITCase.scala | 22 +++++-----
.../flink/table/utils/MockTableEnvironment.scala | 12 +++++-
8 files changed, 170 insertions(+), 45 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index d5f5ce0..614b6e7 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.api;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.descriptors.ConnectTableDescriptor;
@@ -546,6 +547,55 @@ public interface TableEnvironment {
void createTemporaryView(String path, Table view);
/**
+ * Registers an external {@link TableSource} in this {@link TableEnvironment}'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
+ * be inaccessible in the current session. To make the permanent object available again one can drop the
+ * corresponding temporary object.
+ *
+ * @param name The name under which the {@link TableSource} is registered.
+ * @param tableSource The {@link TableSource} to register.
+ * @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
+ */
+ @Deprecated
+ void registerTableSource(String name, TableSource<?> tableSource);
+
+ /**
+ * Registers an external {@link TableSink} with given field names and types in this
+ * {@link TableEnvironment}'s catalog.
+ * Registered sink tables can be referenced in SQL DML statements.
+ *
+ * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
+ * be inaccessible in the current session. To make the permanent object available again one can drop the
+ * corresponding temporary object.
+ *
+ * @param name The name under which the {@link TableSink} is registered.
+ * @param fieldNames The field names to register with the {@link TableSink}.
+ * @param fieldTypes The field types to register with the {@link TableSink}.
+ * @param tableSink The {@link TableSink} to register.
+ * @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
+ */
+ @Deprecated
+ void registerTableSink(String name, String[] fieldNames, TypeInformation<?>[] fieldTypes, TableSink<?> tableSink);
+
+ /**
+ * Registers an external {@link TableSink} with already configured field names and field types in
+ * this {@link TableEnvironment}'s catalog.
+ * Registered sink tables can be referenced in SQL DML statements.
+ *
+ * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will
+ * be inaccessible in the current session. To make the permanent object available again one can drop the
+ * corresponding temporary object.
+ *
+ * @param name The name under which the {@link TableSink} is registered.
+ * @param configuredSink The configured {@link TableSink} to register.
+ * @deprecated Use {@link #connect(ConnectorDescriptor)} instead.
+ */
+ @Deprecated
+ void registerTableSink(String name, TableSink<?> configuredSink);
+
+ /**
* Scans a registered table and returns the resulting {@link Table}.
*
* <p>A table to scan must be registered in the {@link TableEnvironment}. It can be either directly
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 6edce4a..cd47f35 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.api.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.core.execution.JobClient;
@@ -444,6 +445,32 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
}
@Override
+ public void registerTableSource(String name, TableSource<?> tableSource) {
+ // only accept StreamTableSource and LookupableTableSource here
+ // TODO should add a validation, while StreamTableSource is in flink-table-api-java-bridge module now
+ registerTableSourceInternal(name, tableSource);
+ }
+
+ @Override
+ public void registerTableSink(
+ String name,
+ String[] fieldNames,
+ TypeInformation<?>[] fieldTypes,
+ TableSink<?> tableSink) {
+ registerTableSink(name, tableSink.configure(fieldNames, fieldTypes));
+ }
+
+ @Override
+ public void registerTableSink(String name, TableSink<?> configuredSink) {
+ // validate
+ if (configuredSink.getTableSchema().getFieldCount() == 0) {
+ throw new TableException("Table schema cannot be empty.");
+ }
+
+ registerTableSinkInternal(name, configuredSink);
+ }
+
+ @Override
public Table scan(String... tablePath) {
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(tablePath);
return scanInternal(unresolvedIdentifier)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
index 599e4e9..50f83a9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala
@@ -53,14 +53,14 @@ class LegacySinkTest extends TableTestBase {
util.tableEnv.createTemporaryView("TempTable", table)
val retractSink = util.createRetractTableSink(Array("cnt"), Array(LONG))
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ util.tableEnv.registerTableSink(
"retractSink1", retractSink)
stmtSet.addInsert("retractSink1", table)
val table2 = util.tableEnv.sqlQuery(
"SELECT cnt, SUM(cnt) OVER (ORDER BY PROCTIME()) FROM TempTable")
val retractSink2 = util.createRetractTableSink(Array("cnt", "total"), Array(LONG, LONG))
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ util.tableEnv.registerTableSink(
"retractSink2", retractSink2)
stmtSet.addInsert("retractSink2", table2)
@@ -143,13 +143,13 @@ class LegacySinkTest extends TableTestBase {
val table1 = util.tableEnv.sqlQuery("SELECT b, cnt FROM TempTable WHERE b < 4")
val retractSink = util.createRetractTableSink(Array("b", "cnt"), Array(LONG, LONG))
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ util.tableEnv.registerTableSink(
"retractSink", retractSink)
stmtSet.addInsert("retractSink", table1)
val table2 = util.tableEnv.sqlQuery("SELECT b, cnt FROM TempTable WHERE b >= 4 AND b < 6")
val upsertSink = util.createUpsertTableSink(Array(), Array("b", "cnt"), Array(LONG, LONG))
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ util.tableEnv.registerTableSink(
"upsertSink", upsertSink)
stmtSet.addInsert("upsertSink", table2)
@@ -165,13 +165,13 @@ class LegacySinkTest extends TableTestBase {
val table1 = util.tableEnv.sqlQuery(
"SELECT cnt, COUNT(b) AS frequency FROM TempTable WHERE b < 4 GROUP BY cnt")
val upsertSink1 = util.createUpsertTableSink(Array(0), Array("b", "cnt"), Array(LONG, LONG))
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ util.tableEnv.registerTableSink(
"upsertSink1", upsertSink1)
stmtSet.addInsert("upsertSink1", table1)
val table2 = util.tableEnv.sqlQuery("SELECT b, cnt FROM TempTable WHERE b >= 4 AND b < 6")
val upsertSink2 = util.createUpsertTableSink(Array(), Array("b", "cnt"), Array(LONG, LONG))
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ util.tableEnv.registerTableSink(
"upsertSink2", upsertSink2)
stmtSet.addInsert("upsertSink2", table2)
@@ -189,7 +189,7 @@ class LegacySinkTest extends TableTestBase {
util.tableEnv.registerTable("TempTable", table)
val appendSink = util.createAppendTableSink(Array("a", "b"), Array(INT, LONG))
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ util.tableEnv.registerTableSink(
"appendSink", appendSink)
stmtSet.addInsert("appendSink", table)
@@ -199,13 +199,13 @@ class LegacySinkTest extends TableTestBase {
val table2 = util.tableEnv.sqlQuery("SELECT SUM(a) AS total_sum FROM TempTable1")
val retractSink = util.createRetractTableSink(Array("total_sum"), Array(INT))
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ util.tableEnv.registerTableSink(
"retractSink", retractSink)
stmtSet.addInsert("retractSink", table2)
val table3 = util.tableEnv.sqlQuery("SELECT MIN(a) AS total_min FROM TempTable1")
val upsertSink = util.createUpsertTableSink(Array(), Array("total_min"), Array(INT))
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ util.tableEnv.registerTableSink(
"upsertSink", upsertSink)
stmtSet.addInsert("upsertSink", table3)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala
index 72d2c11..fa35afe 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala
@@ -41,7 +41,7 @@ class LegacyTableSourceTest extends TableTestBase {
Array("id", "rowtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"rowTimeT",
new TestTableSourceWithTime[Row](
false, tableSchema, returnType, Seq(), rowtime = "rowtime"))
@@ -62,7 +62,7 @@ class LegacyTableSourceTest extends TableTestBase {
Array("id", "rowtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"rowTimeT",
new TestTableSourceWithTime[Row](
false, tableSchema, returnType, Seq(), rowtime = "rowtime"))
@@ -83,7 +83,7 @@ class LegacyTableSourceTest extends TableTestBase {
Array("id", "rowtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"rowTimeT",
new TestTableSourceWithTime[Row](
false, tableSchema, returnType, Seq(), rowtime = "rowtime"))
@@ -107,7 +107,7 @@ class LegacyTableSourceTest extends TableTestBase {
Array("id", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"procTimeT",
new TestTableSourceWithTime[Row](
false, tableSchema, returnType, Seq(), proctime = "proctime"))
@@ -127,7 +127,7 @@ class LegacyTableSourceTest extends TableTestBase {
Array("id", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"procTimeT",
new TestTableSourceWithTime[Row](
false, tableSchema, returnType, Seq(), proctime = "proctime"))
@@ -150,7 +150,7 @@ class LegacyTableSourceTest extends TableTestBase {
Array("id", "name", "val", "rtime"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestLegacyProjectableTableSource(
false, tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -170,7 +170,7 @@ class LegacyTableSourceTest extends TableTestBase {
Array("id", "name", "val", "rtime"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestLegacyProjectableTableSource(
false, tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -190,7 +190,7 @@ class LegacyTableSourceTest extends TableTestBase {
Array("id", "rtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestLegacyProjectableTableSource(
false, tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -210,7 +210,7 @@ class LegacyTableSourceTest extends TableTestBase {
Array("id", "rtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestLegacyProjectableTableSource(
false, tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -230,7 +230,7 @@ class LegacyTableSourceTest extends TableTestBase {
Array("id", "rtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestLegacyProjectableTableSource(
false, tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -251,7 +251,7 @@ class LegacyTableSourceTest extends TableTestBase {
val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name")
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestLegacyProjectableTableSource(
false, tableSchema, returnType, Seq(), "rtime", "ptime", mapping))
@@ -287,7 +287,7 @@ class LegacyTableSourceTest extends TableTestBase {
Array("id", "deepNested", "nested", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestNestedProjectableTableSource(
false, tableSchema, returnType, Seq()))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 905a031..16f04e1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -322,6 +322,44 @@ abstract class TableEnvImpl(
false)
}
+ override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
+ validateTableSource(tableSource)
+ registerTableSourceInternal(name, tableSource)
+ }
+
+ override def registerTableSink(
+ name: String,
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]],
+ tableSink: TableSink[_]): Unit = {
+
+ if (fieldNames == null) {
+ throw new TableException("fieldNames must not be null.")
+ }
+ if (fieldTypes == null) {
+ throw new TableException("fieldTypes must not be null.")
+ }
+ if (fieldNames.length == 0) {
+ throw new TableException("fieldNames must not be empty.")
+ }
+ if (fieldNames.length != fieldTypes.length) {
+ throw new TableException("Same number of field names and types required.")
+ }
+
+ val configuredSink = tableSink.configure(fieldNames, fieldTypes)
+ registerTableSinkInternal(name, configuredSink)
+ }
+
+ override def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = {
+ // validate
+ if (configuredSink.getTableSchema.getFieldNames.length == 0) {
+ throw new TableException("Field names must not be empty.")
+ }
+
+ validateTableSink(configuredSink)
+ registerTableSinkInternal(name, configuredSink)
+ }
+
override def fromTableSource(source: TableSource[_]): Table = {
createTable(new TableSourceQueryOperation(source, isBatchTable))
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
index 8c2d426..75367d5 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
@@ -42,7 +42,7 @@ class TableSourceTest extends TableTestBase {
Array("id", "rowtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"rowTimeT",
new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime"))
@@ -66,7 +66,7 @@ class TableSourceTest extends TableTestBase {
Array("id", "rowtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"rowTimeT",
new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime"))
@@ -90,7 +90,7 @@ class TableSourceTest extends TableTestBase {
Array("id", "rowtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"rowTimeT",
new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime"))
@@ -133,7 +133,7 @@ class TableSourceTest extends TableTestBase {
Array("id", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"procTimeT",
new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), proctime = "proctime"))
@@ -161,7 +161,7 @@ class TableSourceTest extends TableTestBase {
Array("id", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"procTimeT",
new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), proctime = "proctime"))
@@ -200,7 +200,7 @@ class TableSourceTest extends TableTestBase {
Array("id", "name", "val", "rtime"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -223,7 +223,7 @@ class TableSourceTest extends TableTestBase {
Array("id", "name", "val", "rtime"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -249,7 +249,7 @@ class TableSourceTest extends TableTestBase {
Array("id", "rtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -271,7 +271,7 @@ class TableSourceTest extends TableTestBase {
Array("id", "rtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -293,7 +293,7 @@ class TableSourceTest extends TableTestBase {
Array("id", "rtime", "val", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
@@ -317,7 +317,7 @@ class TableSourceTest extends TableTestBase {
val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name")
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime", mapping))
@@ -356,7 +356,7 @@ class TableSourceTest extends TableTestBase {
Array("id", "deepNested", "nested", "name"))
val util = streamTestUtil()
- util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ util.tableEnv.registerTableSource(
"T",
new TestNestedProjectableTableSource(tableSchema, returnType, Seq()))
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
index c68a7c7..30db7ed 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -150,7 +150,7 @@ class TableEnvironmentITCase(
val fieldNames = Array("d", "e", "f")
val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
- tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ tEnv.registerTableSink(
"targetTable", sink.configure(fieldNames, fieldTypes))
val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
@@ -173,7 +173,7 @@ class TableEnvironmentITCase(
val fieldNames = Array("d", "e", "f")
val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
- tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ tEnv.registerTableSink(
"targetTable", sink.configure(fieldNames, fieldTypes))
val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
@@ -220,7 +220,7 @@ class TableEnvironmentITCase(
val fieldNames = Array("d", "e", "f", "g")
val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
- tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ tEnv.registerTableSink(
"targetTable", sink.configure(fieldNames, fieldTypes))
val sql = "INSERT INTO targetTable SELECT * FROM sourceTable where id > 7"
@@ -259,7 +259,7 @@ class TableEnvironmentITCase(
val fieldNames = Array("d", "e", "f")
val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
- tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ tEnv.registerTableSink(
"targetTable", sink.configure(fieldNames, fieldTypes))
val result = tEnv.sqlQuery("SELECT c, b, a FROM sourceTable").select('a.avg, 'b.sum, 'c.count)
@@ -294,7 +294,7 @@ class TableEnvironmentITCase(
val fieldNames = Array("d", "e", "f")
val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
val sink1 = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
- tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ tEnv.registerTableSink(
"targetTable", sink1.configure(fieldNames, fieldTypes))
val tableResult = tEnv.executeSql("INSERT INTO targetTable SELECT a, b, c FROM sourceTable")
@@ -319,7 +319,7 @@ class TableEnvironmentITCase(
val sinkPath = resultFile.getAbsolutePath
val configuredSink = new TestingOverwritableTableSink(sinkPath)
.configure(Array("d"), Array(STRING))
- tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", configuredSink)
+ tEnv.registerTableSink("MySink", configuredSink)
val tableResult1 = tEnv.executeSql("INSERT overwrite MySink SELECT c FROM sourceTable")
checkInsertTableResult(tableResult1, "default_catalog.default_database.MySink")
@@ -354,7 +354,7 @@ class TableEnvironmentITCase(
val fieldNames = Array("d", "e", "f")
val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
val sink1 = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
- tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ tEnv.registerTableSink(
"targetTable", sink1.configure(fieldNames, fieldTypes))
val sink1Path = registerCsvTableSink(tEnv, fieldNames, fieldTypes, "MySink1")
@@ -392,7 +392,7 @@ class TableEnvironmentITCase(
val fieldNames = Array("d", "e", "f")
val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
- tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ tEnv.registerTableSink(
"targetTable", sink.configure(fieldNames, fieldTypes))
val result = tEnv.sqlQuery("SELECT c, b, a FROM sourceTable").select('a.avg, 'b.sum, 'c.count)
@@ -431,7 +431,7 @@ class TableEnvironmentITCase(
val fieldNames = Array("d", "e", "f")
val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes
val sink1 = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
- tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ tEnv.registerTableSink(
"targetTable", sink1.configure(fieldNames, fieldTypes))
val table = tEnv.sqlQuery("SELECT a, b, c FROM sourceTable")
@@ -457,7 +457,7 @@ class TableEnvironmentITCase(
val sinkPath = resultFile.getAbsolutePath
val configuredSink = new TestingOverwritableTableSink(sinkPath)
.configure(Array("d"), Array(STRING))
- tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", configuredSink)
+ tEnv.registerTableSink("MySink", configuredSink)
val tableResult1 = tEnv.sqlQuery("SELECT c FROM sourceTable").executeInsert("MySink", true)
checkInsertTableResult(tableResult1, "default_catalog.default_database.MySink")
@@ -543,7 +543,7 @@ class TableEnvironmentITCase(
val sinkPath = _tempFolder.newFile().getAbsolutePath
val configuredSink = new TestingOverwritableTableSink(sinkPath)
.configure(Array("d", "e", "f"), Array(INT, LONG, STRING))
- tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", configuredSink)
+ tEnv.registerTableSink("MySink", configuredSink)
assertTrue(FileUtils.readFileUtf8(new File(sinkPath)).isEmpty)
val stmtSet = tEnv.createStatementSet()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 0f69ff3..d608765 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -20,14 +20,15 @@ package org.apache.flink.table.utils
import java.lang.{Iterable => JIterable}
import java.util.Optional
-
import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{ExplainDetail, StatementSet, Table, TableConfig, TableEnvironment, TableResult}
import org.apache.flink.table.catalog.Catalog
import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction}
import org.apache.flink.table.module.Module
+import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.types.AbstractDataType
@@ -39,6 +40,15 @@ class MockTableEnvironment extends TableEnvironment {
override def registerTable(name: String, table: Table): Unit = ???
+ override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = ???
+
+ override def registerTableSink(
+ name: String,
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]], tableSink: TableSink[_]): Unit = ???
+
+ override def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = ???
+
override def scan(tablePath: String*): Table = ???
override def connect(connectorDescriptor: ConnectorDescriptor): ConnectTableDescriptor = ???