You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/09/18 22:38:21 UTC

[flink] 02/02: [FLINK-10079] [table] Look up sink tables in external catalogs.

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc37d7a7555aa7f18da2d0eb62c569cb080332ed
Author: jerryjzhang <zh...@163.com>
AuthorDate: Fri Sep 14 16:26:36 2018 +0800

    [FLINK-10079] [table] Look up sink tables in external catalogs.
    
    This closes #6508.
---
 .../flink/table/api/BatchTableEnvironment.scala    |   4 +-
 .../flink/table/api/StreamTableEnvironment.scala   |   4 +-
 .../apache/flink/table/api/TableEnvironment.scala  |  60 ++++++++----
 .../table/catalog/ExternalCatalogSchema.scala      |   2 +-
 .../table/api/ExternalCatalogInsertTest.scala      | 107 +++++++++++++++++++++
 .../table/catalog/ExternalCatalogSchemaTest.scala  |   2 +-
 .../flink/table/runtime/utils/CommonTestData.scala |  23 +++++
 7 files changed, 179 insertions(+), 23 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 9265f0f..04a7916 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -107,7 +107,7 @@ abstract class BatchTableEnvironment(
       // check for proper batch table source
       case batchTableSource: BatchTableSource[_] =>
         // check if a table (source or sink) is registered
-        Option(getTable(name)) match {
+        getTable(name) match {
 
           // table source and/or sink is registered
           case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
@@ -249,7 +249,7 @@ abstract class BatchTableEnvironment(
       case _: BatchTableSink[_] =>
 
         // check if a table (source or sink) is registered
-        Option(getTable(name)) match {
+        getTable(name) match {
 
           // table source and/or sink is registered
           case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 4c73032..d31ce6c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -126,7 +126,7 @@ abstract class StreamTableEnvironment(
         }
 
         // register
-        Option(getTable(name)) match {
+        getTable(name) match {
 
           // check if a table (source or sink) is registered
           case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
@@ -273,7 +273,7 @@ abstract class StreamTableEnvironment(
       case _: StreamTableSink[_] =>
 
         // check if a table (source or sink) is registered
-        Option(getTable(name)) match {
+        getTable(name) match {
 
           // table source and/or sink is registered
           case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 195812d..5691ab7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -28,7 +28,6 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
 import org.apache.calcite.plan.{RelOptPlanner, RelOptUtil, RelTraitSet}
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql._
@@ -749,42 +748,42 @@ abstract class TableEnvironment(val config: TableConfig) {
     // check that sink table exists
     if (null == sinkTableName) throw TableException("Name of TableSink must not be null.")
     if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.")
-    if (!isRegistered(sinkTableName)) {
-      throw TableException(s"No table was registered under the name $sinkTableName.")
-    }
 
     getTable(sinkTableName) match {
 
-      // check for registered table that wraps a sink
-      case s: TableSourceSinkTable[_, _] if s.tableSinkTable.isDefined =>
+      case None =>
+        throw TableException(s"No table was registered under the name $sinkTableName.")
+
+      case Some(s: TableSourceSinkTable[_, _]) if s.tableSinkTable.isDefined =>
         val tableSink = s.tableSinkTable.get.tableSink
         // validate schema of source table and table sink
         val srcFieldTypes = table.getSchema.getTypes
         val sinkFieldTypes = tableSink.getFieldTypes
 
         if (srcFieldTypes.length != sinkFieldTypes.length ||
-          srcFieldTypes.zip(sinkFieldTypes).exists{case (srcF, snkF) => srcF != snkF}) {
+          srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) => srcF != snkF }) {
 
           val srcFieldNames = table.getSchema.getColumnNames
           val sinkFieldNames = tableSink.getFieldNames
 
           // format table and table sink schema strings
           val srcSchema = srcFieldNames.zip(srcFieldTypes)
-            .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+            .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" }
             .mkString("[", ", ", "]")
           val sinkSchema = sinkFieldNames.zip(sinkFieldTypes)
-            .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+            .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" }
             .mkString("[", ", ", "]")
 
           throw ValidationException(
-            s"Field types of query result and registered TableSink $sinkTableName do not match.\n" +
+            s"Field types of query result and registered TableSink " +
+              s"$sinkTableName do not match.\n" +
               s"Query result schema: $srcSchema\n" +
               s"TableSink schema:    $sinkSchema")
         }
-
         // emit the table to the configured table sink
         writeToSink(table, tableSink, conf)
-      case _ =>
+
+      case Some(_) =>
         throw TableException(s"The table registered as $sinkTableName is not a TableSink. " +
           s"You can only emit query results to a registered TableSink.")
     }
@@ -828,12 +827,39 @@ abstract class TableEnvironment(val config: TableConfig) {
     rootSchema.getTableNames.contains(name)
   }
 
-  protected def getTable(name: String): org.apache.calcite.schema.Table = {
-    rootSchema.getTable(name)
-  }
+  /**
+    * Get a table from either internal or external catalogs.
+    *
+    * @param name The name of the table.
+    * @return The table registered either internally or externally, None otherwise.
+    */
+  protected def getTable(name: String): Option[org.apache.calcite.schema.Table] = {
+
+    // recursively fetches a table from a schema.
+    def getTableFromSchema(
+        schema: SchemaPlus,
+        path: List[String]): Option[org.apache.calcite.schema.Table] = {
+
+      path match {
+        case tableName :: Nil =>
+          // look up table
+          Option(schema.getTable(tableName))
+        case subschemaName :: remain =>
+          // look up subschema
+          val subschema = Option(schema.getSubSchema(subschemaName))
+          subschema match {
+            case Some(s) =>
+              // search for table in subschema
+              getTableFromSchema(s, remain)
+            case None =>
+              // subschema does not exist
+              None
+          }
+      }
+    }
 
-  protected def getRowType(name: String): RelDataType = {
-    rootSchema.getTable(name).getRowType(typeFactory)
+    val pathNames = name.split('.').toList
+    getTableFromSchema(rootSchema, pathNames)
   }
 
   /** Returns a unique temporary attribute name. */
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index 022b9a2..3129ad3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -96,7 +96,7 @@ class ExternalCatalogSchema(
 
   override def getFunctionNames: JSet[String] = JCollections.emptySet[String]
 
-  override def getTableNames: JSet[String] = JCollections.emptySet[String]
+  override def getTableNames: JSet[String] = new JLinkedHashSet(catalog.listTables())
 
   override def snapshot(v: SchemaVersion): Schema = this
 
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala
new file mode 100644
index 0000000..4b1fb18
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.CommonTestData
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+/**
+  * Test for inserting into tables from external catalog.
+  */
+class ExternalCatalogInsertTest extends TableTestBase {
+  private val tableBatchEnv = TableEnvironment.getTableEnvironment(
+    ExecutionEnvironment.getExecutionEnvironment)
+  private val tableStreamEnv = TableEnvironment.getTableEnvironment(
+    StreamExecutionEnvironment.getExecutionEnvironment)
+
+  @Test
+  def testBatchTableApi(): Unit = {
+    tableBatchEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = false))
+
+    val table1 = tableBatchEnv.scan("test", "db1", "tb1")
+    val table2 = tableBatchEnv.scan("test", "db2", "tb2")
+    table2.select('d * 2, 'e, 'g.upperCase())
+      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+      .insertInto("test.db3.tb3")
+  }
+
+  @Test
+  def testBatchSQL(): Unit = {
+    tableBatchEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = false))
+
+    val sqlInsert = "INSERT INTO `test.db3.tb3` SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " +
+      "UNION ALL (SELECT a * 2, b, c FROM test.db1.tb1)"
+
+    tableBatchEnv.sqlUpdate(sqlInsert)
+  }
+
+  @Test
+  def testStreamTableApi(): Unit = {
+    var tableEnv = tableStreamEnv
+
+    tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = true))
+
+    val table1 = tableEnv.scan("test", "db1", "tb1")
+    val table2 = tableEnv.scan("test", "db2", "tb2")
+
+    table2.where("d < 3")
+      .select('d * 2, 'e, 'g.upperCase())
+      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+      .insertInto("test.db3.tb3")
+  }
+
+  @Test
+  def testStreamSQL(): Unit = {
+    var tableEnv = tableStreamEnv
+
+    tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = true))
+
+    val sqlInsert = "INSERT INTO `test.db3.tb3` SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " +
+      "UNION ALL (SELECT a * 2, b, c FROM test.db1.tb1)"
+
+    tableEnv.sqlUpdate(sqlInsert)
+  }
+
+  @Test
+  def testTopLevelTable(): Unit = {
+    var tableEnv = tableBatchEnv
+
+    tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = false))
+
+    val table1 = tableEnv.scan("test", "tb1")
+    val table2 = tableEnv.scan("test", "db2", "tb2")
+    table2.select('d * 2, 'e, 'g.upperCase())
+      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+      .insertInto("test.tb3")
+  }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index c98a7c1..2ca7fba 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -71,7 +71,7 @@ class ExternalCatalogSchemaTest extends TableTestBase {
         .filter(_.getType.equals(SqlMonikerType.SCHEMA))
         .map(_.getFullyQualifiedNames.asScala.toList).toSet
     assertTrue(Set(List(schemaName), List(schemaName, "db1"),
-      List(schemaName, "db2")) == subSchemas)
+      List(schemaName, "db2"), List(schemaName, "db3")) == subSchemas)
   }
 
   @Test
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index e62396f..64fcc8a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -129,16 +129,39 @@ object CommonTestData {
       externalTableBuilder2.inAppendMode()
     }
 
+    val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
+    val connDesc3 = FileSystem().path(tempFilePath3)
+    val formatDesc3 = Csv()
+      .field("x", Types.INT)
+      .field("y", Types.LONG)
+      .field("z", Types.STRING)
+      .fieldDelimiter("#")
+    val schemaDesc3 = Schema()
+      .field("x", Types.INT)
+      .field("y", Types.LONG)
+      .field("z", Types.STRING)
+    val externalTableBuilder3 = ExternalCatalogTable.builder(connDesc3)
+      .withFormat(formatDesc3)
+      .withSchema(schemaDesc3)
+
+    if (isStreaming) {
+      externalTableBuilder3.inAppendMode()
+    }
+
     val catalog = new InMemoryExternalCatalog("test")
     val db1 = new InMemoryExternalCatalog("db1")
     val db2 = new InMemoryExternalCatalog("db2")
+    val db3 = new InMemoryExternalCatalog("db3")
     catalog.createSubCatalog("db1", db1, ignoreIfExists = false)
     catalog.createSubCatalog("db2", db2, ignoreIfExists = false)
+    catalog.createSubCatalog("db3", db3, ignoreIfExists = false)
 
     // Register the table with both catalogs
     catalog.createTable("tb1", externalTableBuilder1.asTableSource(), ignoreIfExists = false)
+    catalog.createTable("tb3", externalTableBuilder3.asTableSink(), ignoreIfExists = false)
     db1.createTable("tb1", externalTableBuilder1.asTableSource(), ignoreIfExists = false)
     db2.createTable("tb2", externalTableBuilder2.asTableSource(), ignoreIfExists = false)
+    db3.createTable("tb3", externalTableBuilder3.asTableSink(), ignoreIfExists = false)
     catalog
   }