You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/15 17:14:15 UTC
[flink] branch release-1.9 updated: [FLINK-13170][table-planner]
Planner should get table factory from catalog when creating sink for
CatalogTable
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 7b4f39d [FLINK-13170][table-planner] Planner should get table factory from catalog when creating sink for CatalogTable
7b4f39d is described below
commit 7b4f39d9482fcba2526d5fe1adef9850de3473fc
Author: Rui Li <li...@apache.org>
AuthorDate: Tue Jul 9 20:09:20 2019 +0800
[FLINK-13170][table-planner] Planner should get table factory from catalog when creating sink for CatalogTable
Planner should first try getting table factory from catalog when creating table sinks for CatalogTable.
This closes #9039.
---
.../batch/connectors/hive/HiveTableSinkTest.java | 19 ++++++---------
.../flink/table/factories/TableFactoryUtil.java | 14 +++++++++++
.../apache/flink/table/planner/PlannerBase.scala | 15 ++++++++++--
.../flink/table/plan/batch/sql/SinkTest.scala | 28 ++++++++++++++++++++++
.../flink/table/api/internal/TableEnvImpl.scala | 13 +++++++++-
.../apache/flink/table/planner/StreamPlanner.scala | 13 +++++++++-
6 files changed, 86 insertions(+), 16 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index fe54eac..d16f2b0 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -88,9 +88,8 @@ public class HiveTableSinkTest {
List<Row> toWrite = generateRecords(5);
tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
- CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath);
- tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table));
- tableEnv.sqlQuery("select * from src").insertInto("destSink");
+ tableEnv.registerCatalog("hive", hiveCatalog);
+ tableEnv.sqlQuery("select * from src").insertInto("hive", "default", "dest");
execEnv.execute();
verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
@@ -110,9 +109,8 @@ public class HiveTableSinkTest {
List<Row> toWrite = generateRecords(5);
tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
- CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath);
- tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table));
- tableEnv.sqlQuery("select * from src").insertInto("destSink");
+ tableEnv.registerCatalog("hive", hiveCatalog);
+ tableEnv.sqlQuery("select * from src").insertInto("hive", "default", "dest");
execEnv.execute();
List<CatalogPartitionSpec> partitionSpecs = hiveCatalog.listPartitions(tablePath);
@@ -156,9 +154,8 @@ public class HiveTableSinkTest {
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
tableEnv.registerDataSet("complexSrc", execEnv.fromCollection(toWrite, rowTypeInfo));
- CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
- tableEnv.registerTableSink("complexSink", new HiveTableSink(new JobConf(hiveConf), tablePath, catalogTable));
- tableEnv.sqlQuery("select * from complexSrc").insertInto("complexSink");
+ tableEnv.registerCatalog("hive", hiveCatalog);
+ tableEnv.sqlQuery("select * from complexSrc").insertInto("hive", "default", "dest");
execEnv.execute();
List<String> result = hiveShell.executeQuery("select * from " + tblName);
@@ -185,9 +182,7 @@ public class HiveTableSinkTest {
toWrite.add(row);
tableEnv.registerDataSet("nestedSrc", execEnv.fromCollection(toWrite, rowTypeInfo));
- catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
- tableEnv.registerTableSink("nestedSink", new HiveTableSink(new JobConf(hiveConf), tablePath, catalogTable));
- tableEnv.sqlQuery("select * from nestedSrc").insertInto("nestedSink");
+ tableEnv.sqlQuery("select * from nestedSrc").insertInto("hive", "default", "dest");
execEnv.execute();
result = hiveShell.executeQuery("select * from " + tblName);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index 1a641f4..6bef62d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -19,13 +19,16 @@
package org.apache.flink.table.factories;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ExternalCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.descriptors.Descriptor;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import java.util.Map;
+import java.util.Optional;
/**
* Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}.
@@ -101,4 +104,15 @@ public class TableFactoryUtil {
return findAndCreateTableSource(table.toProperties());
}
+ /**
+ * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog.
+ */
+ public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, CatalogTable catalogTable, ObjectPath tablePath) {
+ TableFactory tableFactory = catalog.getTableFactory().orElse(null);
+ if (tableFactory instanceof TableSinkFactory) {
+ return Optional.ofNullable(((TableSinkFactory) tableFactory).createTableSink(tablePath, catalogTable));
+ }
+ return Optional.empty();
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
index be81a0b..021a650 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException}
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, FunctionCatalog, ObjectPath}
import org.apache.flink.table.delegation.{Executor, Planner}
import org.apache.flink.table.executor.ExecutorBase
import org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl
@@ -248,7 +248,18 @@ abstract class PlannerBase(
case Some(s) if JavaScalaConversionUtil.toScala(s.getCatalogTable)
.exists(_.isInstanceOf[CatalogTable]) =>
- val sinkProperties = s.getCatalogTable.get().asInstanceOf[CatalogTable].toProperties
+ val catalog = catalogManager.getCatalog(s.getTablePath.get(0))
+ val catalogTable = s.getCatalogTable.get().asInstanceOf[CatalogTable]
+ if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
+ val dbName = s.getTablePath.get(1)
+ val tableName = s.getTablePath.get(2)
+ val sink = TableFactoryUtil.createTableSinkForCatalogTable(
+ catalog.get(), catalogTable, new ObjectPath(dbName, tableName))
+ if (sink.isPresent) {
+ return Option(sink.get())
+ }
+ }
+ val sinkProperties = catalogTable.toProperties
Option(TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)
.createTableSink(sinkProperties))
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SinkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SinkTest.scala
index c9bfcc4..a409ffe 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SinkTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SinkTest.scala
@@ -19,12 +19,19 @@
package org.apache.flink.table.plan.batch.sql
import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{DataTypes, TableSchema}
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.catalog.{CatalogTableImpl, GenericInMemoryCatalog, ObjectPath}
+import org.apache.flink.table.factories.TableSinkFactory
import org.apache.flink.table.plan.optimize.RelNodeBlockPlanBuilder
+import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.types.logical.{BigIntType, IntType}
import org.apache.flink.table.util.TableTestBase
+import java.util.Optional
import org.junit.Test
+import org.mockito.{ArgumentMatchers, Mockito}
+import scala.collection.JavaConverters._
class SinkTest extends TableTestBase {
@@ -60,4 +67,25 @@ class SinkTest extends TableTestBase {
util.verifyPlan()
}
+ @Test
+ def testCatalogTableSink(): Unit = {
+ val schemaBuilder = new TableSchema.Builder()
+ schemaBuilder.fields(Array("i"), Array(DataTypes.INT()))
+ val schema = schemaBuilder.build()
+ val sink = util.createCollectTableSink(schema.getFieldNames, Array(INT))
+ val catalog = Mockito.spy(new GenericInMemoryCatalog("dummy"))
+ val factory = Mockito.mock(classOf[TableSinkFactory[_]])
+ Mockito.when[Optional[_]](catalog.getTableFactory).thenReturn(Optional.of(factory))
+ Mockito.when[TableSink[_]](factory.createTableSink(
+ ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(sink)
+ util.tableEnv.registerCatalog(catalog.getName, catalog)
+ util.tableEnv.useCatalog(catalog.getName)
+ val catalogTable = new CatalogTableImpl(schema, Map[String, String]().asJava, "")
+ catalog.createTable(new ObjectPath("default", "tbl"), catalogTable, false)
+ util.tableEnv.sqlQuery("select 1").insertInto("tbl")
+ util.tableEnv.explain(false)
+ // verify we tried to get table factory from catalog
+ Mockito.verify(catalog, Mockito.atLeast(1)).getTableFactory
+ }
+
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index abb94c9..13d6c53 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -552,7 +552,18 @@ abstract class TableEnvImpl(
case Some(s) if JavaScalaConversionUtil.toScala(s.getCatalogTable)
.exists(_.isInstanceOf[CatalogTable]) =>
- val sinkProperties = s.getCatalogTable.get().asInstanceOf[CatalogTable].toProperties
+ val catalog = catalogManager.getCatalog(s.getTablePath.get(0))
+ val catalogTable = s.getCatalogTable.get().asInstanceOf[CatalogTable]
+ if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
+ val dbName = s.getTablePath.get(1)
+ val tableName = s.getTablePath.get(2)
+ val sink = TableFactoryUtil.createTableSinkForCatalogTable(
+ catalog.get(), catalogTable, new ObjectPath(dbName, tableName))
+ if (sink.isPresent) {
+ return Option(sink.get())
+ }
+ }
+ val sinkProperties = catalogTable.toProperties
Option(TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)
.createTableSink(sinkProperties))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index aa41b0c..de163a3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -415,7 +415,18 @@ class StreamPlanner(
case Some(s) if JavaScalaConversionUtil.toScala(s.getCatalogTable)
.exists(_.isInstanceOf[CatalogTable]) =>
- val sinkProperties = s.getCatalogTable.get().asInstanceOf[CatalogTable].toProperties
+ val catalog = catalogManager.getCatalog(s.getTablePath.get(0))
+ val catalogTable = s.getCatalogTable.get().asInstanceOf[CatalogTable]
+ if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
+ val dbName = s.getTablePath.get(1)
+ val tableName = s.getTablePath.get(2)
+ val sink = TableFactoryUtil.createTableSinkForCatalogTable(
+ catalog.get(), catalogTable, new ObjectPath(dbName, tableName))
+ if (sink.isPresent) {
+ return Option(sink.get())
+ }
+ }
+ val sinkProperties = catalogTable.toProperties
Option(TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties)
.createTableSink(sinkProperties))