You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/09/18 22:38:21 UTC
[flink] 02/02: [FLINK-10079] [table] Look up sink tables in
external catalogs.
This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cc37d7a7555aa7f18da2d0eb62c569cb080332ed
Author: jerryjzhang <zh...@163.com>
AuthorDate: Fri Sep 14 16:26:36 2018 +0800
[FLINK-10079] [table] Look up sink tables in external catalogs.
This closes #6508.
---
.../flink/table/api/BatchTableEnvironment.scala | 4 +-
.../flink/table/api/StreamTableEnvironment.scala | 4 +-
.../apache/flink/table/api/TableEnvironment.scala | 60 ++++++++----
.../table/catalog/ExternalCatalogSchema.scala | 2 +-
.../table/api/ExternalCatalogInsertTest.scala | 107 +++++++++++++++++++++
.../table/catalog/ExternalCatalogSchemaTest.scala | 2 +-
.../flink/table/runtime/utils/CommonTestData.scala | 23 +++++
7 files changed, 179 insertions(+), 23 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 9265f0f..04a7916 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -107,7 +107,7 @@ abstract class BatchTableEnvironment(
// check for proper batch table source
case batchTableSource: BatchTableSource[_] =>
// check if a table (source or sink) is registered
- Option(getTable(name)) match {
+ getTable(name) match {
// table source and/or sink is registered
case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
@@ -249,7 +249,7 @@ abstract class BatchTableEnvironment(
case _: BatchTableSink[_] =>
// check if a table (source or sink) is registered
- Option(getTable(name)) match {
+ getTable(name) match {
// table source and/or sink is registered
case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 4c73032..d31ce6c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -126,7 +126,7 @@ abstract class StreamTableEnvironment(
}
// register
- Option(getTable(name)) match {
+ getTable(name) match {
// check if a table (source or sink) is registered
case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
@@ -273,7 +273,7 @@ abstract class StreamTableEnvironment(
case _: StreamTableSink[_] =>
// check if a table (source or sink) is registered
- Option(getTable(name)) match {
+ getTable(name) match {
// table source and/or sink is registered
case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 195812d..5691ab7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -28,7 +28,6 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
import org.apache.calcite.plan.{RelOptPlanner, RelOptUtil, RelTraitSet}
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.sql._
@@ -749,42 +748,42 @@ abstract class TableEnvironment(val config: TableConfig) {
// check that sink table exists
if (null == sinkTableName) throw TableException("Name of TableSink must not be null.")
if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.")
- if (!isRegistered(sinkTableName)) {
- throw TableException(s"No table was registered under the name $sinkTableName.")
- }
getTable(sinkTableName) match {
- // check for registered table that wraps a sink
- case s: TableSourceSinkTable[_, _] if s.tableSinkTable.isDefined =>
+ case None =>
+ throw TableException(s"No table was registered under the name $sinkTableName.")
+
+ case Some(s: TableSourceSinkTable[_, _]) if s.tableSinkTable.isDefined =>
val tableSink = s.tableSinkTable.get.tableSink
// validate schema of source table and table sink
val srcFieldTypes = table.getSchema.getTypes
val sinkFieldTypes = tableSink.getFieldTypes
if (srcFieldTypes.length != sinkFieldTypes.length ||
- srcFieldTypes.zip(sinkFieldTypes).exists{case (srcF, snkF) => srcF != snkF}) {
+ srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) => srcF != snkF }) {
val srcFieldNames = table.getSchema.getColumnNames
val sinkFieldNames = tableSink.getFieldNames
// format table and table sink schema strings
val srcSchema = srcFieldNames.zip(srcFieldTypes)
- .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+ .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" }
.mkString("[", ", ", "]")
val sinkSchema = sinkFieldNames.zip(sinkFieldTypes)
- .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+ .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" }
.mkString("[", ", ", "]")
throw ValidationException(
- s"Field types of query result and registered TableSink $sinkTableName do not match.\n" +
+ s"Field types of query result and registered TableSink " +
+ s"$sinkTableName do not match.\n" +
s"Query result schema: $srcSchema\n" +
s"TableSink schema: $sinkSchema")
}
-
// emit the table to the configured table sink
writeToSink(table, tableSink, conf)
- case _ =>
+
+ case Some(_) =>
throw TableException(s"The table registered as $sinkTableName is not a TableSink. " +
s"You can only emit query results to a registered TableSink.")
}
@@ -828,12 +827,39 @@ abstract class TableEnvironment(val config: TableConfig) {
rootSchema.getTableNames.contains(name)
}
- protected def getTable(name: String): org.apache.calcite.schema.Table = {
- rootSchema.getTable(name)
- }
+ /**
+ * Get a table from either internal or external catalogs.
+ *
+ * @param name The name of the table.
+ * @return The table registered either internally or externally, None otherwise.
+ */
+ protected def getTable(name: String): Option[org.apache.calcite.schema.Table] = {
+
+ // recursively fetches a table from a schema.
+ def getTableFromSchema(
+ schema: SchemaPlus,
+ path: List[String]): Option[org.apache.calcite.schema.Table] = {
+
+ path match {
+ case tableName :: Nil =>
+ // look up table
+ Option(schema.getTable(tableName))
+ case subschemaName :: remain =>
+ // look up subschema
+ val subschema = Option(schema.getSubSchema(subschemaName))
+ subschema match {
+ case Some(s) =>
+ // search for table in subschema
+ getTableFromSchema(s, remain)
+ case None =>
+ // subschema does not exist
+ None
+ }
+ }
+ }
- protected def getRowType(name: String): RelDataType = {
- rootSchema.getTable(name).getRowType(typeFactory)
+ val pathNames = name.split('.').toList
+ getTableFromSchema(rootSchema, pathNames)
}
/** Returns a unique temporary attribute name. */
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index 022b9a2..3129ad3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -96,7 +96,7 @@ class ExternalCatalogSchema(
override def getFunctionNames: JSet[String] = JCollections.emptySet[String]
- override def getTableNames: JSet[String] = JCollections.emptySet[String]
+ override def getTableNames: JSet[String] = new JLinkedHashSet(catalog.listTables())
override def snapshot(v: SchemaVersion): Schema = this
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala
new file mode 100644
index 0000000..4b1fb18
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.CommonTestData
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+/**
+ * Test for inserting into tables from external catalog.
+ */
+class ExternalCatalogInsertTest extends TableTestBase {
+ private val tableBatchEnv = TableEnvironment.getTableEnvironment(
+ ExecutionEnvironment.getExecutionEnvironment)
+ private val tableStreamEnv = TableEnvironment.getTableEnvironment(
+ StreamExecutionEnvironment.getExecutionEnvironment)
+
+ @Test
+ def testBatchTableApi(): Unit = {
+ tableBatchEnv.registerExternalCatalog(
+ "test",
+ CommonTestData.getInMemoryTestCatalog(isStreaming = false))
+
+ val table1 = tableBatchEnv.scan("test", "db1", "tb1")
+ val table2 = tableBatchEnv.scan("test", "db2", "tb2")
+ table2.select('d * 2, 'e, 'g.upperCase())
+ .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+ .insertInto("test.db3.tb3")
+ }
+
+ @Test
+ def testBatchSQL(): Unit = {
+ tableBatchEnv.registerExternalCatalog(
+ "test",
+ CommonTestData.getInMemoryTestCatalog(isStreaming = false))
+
+ val sqlInsert = "INSERT INTO `test.db3.tb3` SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " +
+ "UNION ALL (SELECT a * 2, b, c FROM test.db1.tb1)"
+
+ tableBatchEnv.sqlUpdate(sqlInsert)
+ }
+
+ @Test
+ def testStreamTableApi(): Unit = {
+ var tableEnv = tableStreamEnv
+
+ tableEnv.registerExternalCatalog(
+ "test",
+ CommonTestData.getInMemoryTestCatalog(isStreaming = true))
+
+ val table1 = tableEnv.scan("test", "db1", "tb1")
+ val table2 = tableEnv.scan("test", "db2", "tb2")
+
+ table2.where("d < 3")
+ .select('d * 2, 'e, 'g.upperCase())
+ .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+ .insertInto("test.db3.tb3")
+ }
+
+ @Test
+ def testStreamSQL(): Unit = {
+ var tableEnv = tableStreamEnv
+
+ tableEnv.registerExternalCatalog(
+ "test",
+ CommonTestData.getInMemoryTestCatalog(isStreaming = true))
+
+ val sqlInsert = "INSERT INTO `test.db3.tb3` SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " +
+ "UNION ALL (SELECT a * 2, b, c FROM test.db1.tb1)"
+
+ tableEnv.sqlUpdate(sqlInsert)
+ }
+
+ @Test
+ def testTopLevelTable(): Unit = {
+ var tableEnv = tableBatchEnv
+
+ tableEnv.registerExternalCatalog(
+ "test",
+ CommonTestData.getInMemoryTestCatalog(isStreaming = false))
+
+ val table1 = tableEnv.scan("test", "tb1")
+ val table2 = tableEnv.scan("test", "db2", "tb2")
+ table2.select('d * 2, 'e, 'g.upperCase())
+ .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+ .insertInto("test.tb3")
+ }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index c98a7c1..2ca7fba 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -71,7 +71,7 @@ class ExternalCatalogSchemaTest extends TableTestBase {
.filter(_.getType.equals(SqlMonikerType.SCHEMA))
.map(_.getFullyQualifiedNames.asScala.toList).toSet
assertTrue(Set(List(schemaName), List(schemaName, "db1"),
- List(schemaName, "db2")) == subSchemas)
+ List(schemaName, "db2"), List(schemaName, "db3")) == subSchemas)
}
@Test
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index e62396f..64fcc8a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -129,16 +129,39 @@ object CommonTestData {
externalTableBuilder2.inAppendMode()
}
+ val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
+ val connDesc3 = FileSystem().path(tempFilePath3)
+ val formatDesc3 = Csv()
+ .field("x", Types.INT)
+ .field("y", Types.LONG)
+ .field("z", Types.STRING)
+ .fieldDelimiter("#")
+ val schemaDesc3 = Schema()
+ .field("x", Types.INT)
+ .field("y", Types.LONG)
+ .field("z", Types.STRING)
+ val externalTableBuilder3 = ExternalCatalogTable.builder(connDesc3)
+ .withFormat(formatDesc3)
+ .withSchema(schemaDesc3)
+
+ if (isStreaming) {
+ externalTableBuilder3.inAppendMode()
+ }
+
val catalog = new InMemoryExternalCatalog("test")
val db1 = new InMemoryExternalCatalog("db1")
val db2 = new InMemoryExternalCatalog("db2")
+ val db3 = new InMemoryExternalCatalog("db3")
catalog.createSubCatalog("db1", db1, ignoreIfExists = false)
catalog.createSubCatalog("db2", db2, ignoreIfExists = false)
+ catalog.createSubCatalog("db3", db3, ignoreIfExists = false)
// Register the table with both catalogs
catalog.createTable("tb1", externalTableBuilder1.asTableSource(), ignoreIfExists = false)
+ catalog.createTable("tb3", externalTableBuilder3.asTableSink(), ignoreIfExists = false)
db1.createTable("tb1", externalTableBuilder1.asTableSource(), ignoreIfExists = false)
db2.createTable("tb2", externalTableBuilder2.asTableSource(), ignoreIfExists = false)
+ db3.createTable("tb3", externalTableBuilder3.asTableSink(), ignoreIfExists = false)
catalog
}