You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/24 19:23:37 UTC
[3/5] flink git commit: [FLINK-5570] [table] Register
ExternalCatalogs in TableEnvironment.
[FLINK-5570] [table] Register ExternalCatalogs in TableEnvironment.
This closes #3409.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/135a57c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/135a57c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/135a57c4
Branch: refs/heads/master
Commit: 135a57c4bb37eaa9cb85faaff1cc694f9448fabd
Parents: 976e03c
Author: jingzhang <be...@126.com>
Authored: Thu Mar 16 11:24:09 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100
----------------------------------------------------------------------
docs/dev/table_api.md | 37 +++++
.../flink/table/api/TableEnvironment.scala | 104 ++++++++++--
.../org/apache/flink/table/api/exceptions.scala | 62 +++++--
.../table/catalog/ExternalCatalogSchema.scala | 14 +-
.../flink/table/plan/logical/operators.scala | 4 +-
.../flink/table/ExternalCatalogTest.scala | 161 +++++++++++++++++++
.../catalog/ExternalCatalogSchemaTest.scala | 5 +-
7 files changed, 342 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 03b916c..117f32f 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -344,6 +344,43 @@ tableEnvironment.unregisterTable("Customers")
</div>
</div>
+Registering external Catalogs
+--------------------------------
+
+An external catalog is defined by the `ExternalCatalog` interface and provides information about databases and tables such as their name, schema, statistics, and access information. An `ExternalCatalog` is registered in a `TableEnvironment` as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// works for StreamExecutionEnvironment identically
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ExternalCatalog customerCatalog = new InMemoryExternalCatalog();
+
+// register the ExternalCatalog customerCatalog
+tableEnv.registerExternalCatalog("Customers", customerCatalog);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// works for StreamExecutionEnvironment identically
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+val customerCatalog: ExternalCatalog = new InMemoryExternalCatalog
+
+// register the ExternalCatalog customerCatalog
+tableEnv.registerExternalCatalog("Customers", customerCatalog)
+
+{% endhighlight %}
+</div>
+</div>
+
+Once registered in a `TableEnvironment`, all tables defined in a `ExternalCatalog` can be accessed from Table API or SQL queries by specifying their full path (`catalog`.`database`.`table`).
+
+Currently, Flink provides an `InMemoryExternalCatalog` for demo and testing purposes. However, the `ExternalCatalog` interface can also be used to connect catalogs like HCatalog or Metastore to the Table API.
Table API
----------
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 1dda3a8..bb4c3ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => Scala
import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
@@ -60,6 +61,8 @@ import org.apache.flink.table.validate.FunctionCatalog
import org.apache.flink.types.Row
import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable.HashMap
+import _root_.scala.annotation.varargs
/**
* The abstract base class for batch and stream TableEnvironments.
@@ -71,7 +74,7 @@ abstract class TableEnvironment(val config: TableConfig) {
// the catalog to hold all registered and translated tables
// we disable caching here to prevent side effects
private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(true, false)
- private val tables: SchemaPlus = internalSchema.plus()
+ private val rootSchema: SchemaPlus = internalSchema.plus()
// Table API/SQL function catalog
private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
@@ -79,7 +82,7 @@ abstract class TableEnvironment(val config: TableConfig) {
// the configuration to create a Calcite planner
private lazy val frameworkConfig: FrameworkConfig = Frameworks
.newConfigBuilder
- .defaultSchema(tables)
+ .defaultSchema(rootSchema)
.parserConfig(getSqlParserConfig)
.costFactory(new DataSetCostFactory)
.typeSystem(new FlinkTypeSystem)
@@ -99,6 +102,9 @@ abstract class TableEnvironment(val config: TableConfig) {
// a counter for unique attribute names
private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0)
+ // registered external catalog names -> catalog
+ private val externalCatalogs = new HashMap[String, ExternalCatalog]
+
/** Returns the table config to define the runtime behavior of the Table API. */
def getConfig = config
@@ -246,6 +252,35 @@ abstract class TableEnvironment(val config: TableConfig) {
}
/**
+ * Registers an [[ExternalCatalog]] under a unique name in the TableEnvironment's schema.
+ * All tables registered in the [[ExternalCatalog]] can be accessed.
+ *
+ * @param name The name under which the externalCatalog will be registered
+ * @param externalCatalog The externalCatalog to register
+ */
+ def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = {
+ if (rootSchema.getSubSchema(name) != null) {
+ throw new ExternalCatalogAlreadyExistException(name)
+ }
+ this.externalCatalogs.put(name, externalCatalog)
+ // create an external catalog calicte schema, register it on the root schema
+ ExternalCatalogSchema.registerCatalog(rootSchema, name, externalCatalog)
+ }
+
+ /**
+ * Gets a registered [[ExternalCatalog]] by name.
+ *
+ * @param name The name to look up the [[ExternalCatalog]]
+ * @return The [[ExternalCatalog]]
+ */
+ def getRegisteredExternalCatalog(name: String): ExternalCatalog = {
+ this.externalCatalogs.get(name) match {
+ case Some(catalog) => catalog
+ case None => throw new ExternalCatalogNotExistException(name)
+ }
+ }
+
+ /**
* Registers a [[ScalarFunction]] under a unique name. Replaces already existing
* user-defined functions under this name.
*/
@@ -254,6 +289,7 @@ abstract class TableEnvironment(val config: TableConfig) {
checkForInstantiation(function.getClass)
// register in Table API
+
functionCatalog.registerFunction(name, function.getClass)
// register in SQL API
@@ -341,7 +377,7 @@ abstract class TableEnvironment(val config: TableConfig) {
protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
if (isRegistered(name)) {
- tables.add(name, table)
+ rootSchema.add(name, table)
} else {
throw new TableException(s"Table \'$name\' is not registered.")
}
@@ -350,19 +386,55 @@ abstract class TableEnvironment(val config: TableConfig) {
/**
* Scans a registered table and returns the resulting [[Table]].
*
- * The table to scan must be registered in the [[TableEnvironment]]'s catalog.
+ * A table to scan must be registered in the TableEnvironment. It can be either directly
+ * registered as DataStream, DataSet, or Table or as member of an [[ExternalCatalog]].
+ *
+ * Examples:
*
- * @param tableName The name of the table to scan.
- * @throws ValidationException if no table is registered under the given name.
- * @return The scanned table.
+ * - Scanning a directly registered table
+ * {{{
+ * val tab: Table = tableEnv.scan("tableName")
+ * }}}
+ *
+ * - Scanning a table from a registered catalog
+ * {{{
+ * val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
+ * }}}
+ *
+ * @param tablePath The path of the table to scan.
+ * @throws TableException if no table is found using the given table path.
+ * @return The resulting [[Table]].
*/
- @throws[ValidationException]
- def scan(tableName: String): Table = {
- if (isRegistered(tableName)) {
- new Table(this, CatalogNode(tableName, getRowType(tableName)))
- } else {
- throw new TableException(s"Table \'$tableName\' was not found in the registry.")
+ @throws[TableException]
+ @varargs
+ def scan(tablePath: String*): Table = {
+ scanInternal(tablePath.toArray)
+ }
+
+ @throws[TableException]
+ private def scanInternal(tablePath: Array[String]): Table = {
+ require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
+ val schemaPaths = tablePath.slice(0, tablePath.length - 1)
+ val schema = getSchema(schemaPaths)
+ if (schema != null) {
+ val tableName = tablePath(tablePath.length - 1)
+ val table = schema.getTable(tableName)
+ if (table != null) {
+ return new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory)))
+ }
+ }
+ throw new TableException(s"Table \'${tablePath.mkString(".")}\' was not found.")
+ }
+
+ private def getSchema(schemaPath: Array[String]): SchemaPlus = {
+ var schema = rootSchema
+ for (schemaName <- schemaPath) {
+ schema = schema.getSubSchema(schemaName)
+ if (schema == null) {
+ return schema
+ }
}
+ schema
}
/**
@@ -416,7 +488,7 @@ abstract class TableEnvironment(val config: TableConfig) {
throw new TableException(s"Table \'$name\' already exists. " +
s"Please, choose a different name.")
} else {
- tables.add(name, table)
+ rootSchema.add(name, table)
}
}
@@ -434,11 +506,11 @@ abstract class TableEnvironment(val config: TableConfig) {
* @return true, if a table is registered under the name, false otherwise.
*/
protected def isRegistered(name: String): Boolean = {
- tables.getTableNames.contains(name)
+ rootSchema.getTableNames.contains(name)
}
protected def getRowType(name: String): RelDataType = {
- tables.getTable(name).getRowType(typeFactory)
+ rootSchema.getTable(name).getRowType(typeFactory)
}
/** Returns a unique temporary attribute name. */
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 8632436..760cf75 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -75,34 +75,34 @@ object ValidationException {
case class UnresolvedException(msg: String) extends RuntimeException(msg)
/**
- * Exception for operation on a nonexistent table
+ * Exception for an operation on a nonexistent table
*
* @param db database name
* @param table table name
- * @param cause
+ * @param cause the cause
*/
case class TableNotExistException(
db: String,
table: String,
cause: Throwable)
- extends RuntimeException(s"table $db.$table does not exist!", cause) {
+ extends RuntimeException(s"Table $db.$table does not exist.", cause) {
def this(db: String, table: String) = this(db, table, null)
}
/**
- * Exception for adding an already existed table
+ * Exception for adding an already existent table
*
* @param db database name
* @param table table name
- * @param cause
+ * @param cause the cause
*/
case class TableAlreadyExistException(
db: String,
table: String,
cause: Throwable)
- extends RuntimeException(s"table $db.$table already exists!", cause) {
+ extends RuntimeException(s"Table $db.$table already exists.", cause) {
def this(db: String, table: String) = this(db, table, null)
@@ -112,56 +112,84 @@ case class TableAlreadyExistException(
* Exception for operation on a nonexistent database
*
* @param db database name
- * @param cause
+ * @param cause the cause
*/
case class DatabaseNotExistException(
db: String,
cause: Throwable)
- extends RuntimeException(s"database $db does not exist!", cause) {
+ extends RuntimeException(s"Database $db does not exist.", cause) {
def this(db: String) = this(db, null)
}
/**
- * Exception for adding an already existed database
+ * Exception for adding an already existent database
*
* @param db database name
- * @param cause
+ * @param cause the cause
*/
case class DatabaseAlreadyExistException(
db: String,
cause: Throwable)
- extends RuntimeException(s"database $db already exists!", cause) {
+ extends RuntimeException(s"Database $db already exists.", cause) {
def this(db: String) = this(db, null)
}
/**
- * Exception for does not find any matched [[TableSourceConverter]] for a specified table type
+ * Exception for not finding a [[TableSourceConverter]] for a given table type.
*
* @param tableType table type
- * @param cause
+ * @param cause the cause
*/
case class NoMatchedTableSourceConverterException(
tableType: String,
cause: Throwable)
- extends RuntimeException(s"find no table source converter matched table type $tableType!",
+ extends RuntimeException(s"Could not find a TableSourceConverter for table type $tableType.",
cause) {
def this(tableType: String) = this(tableType, null)
}
/**
- * Exception for find more than one matched [[TableSourceConverter]] for a specified table type
+ * Exception for finding more than one [[TableSourceConverter]] for a given table type.
*
* @param tableType table type
- * @param cause
+ * @param cause the cause
*/
case class AmbiguousTableSourceConverterException(
tableType: String,
cause: Throwable)
- extends RuntimeException(s"more than one table source converter matched table type $tableType!",
+ extends RuntimeException(s"More than one TableSourceConverter for table type $tableType.",
cause) {
def this(tableType: String) = this(tableType, null)
}
+
+/**
+ * Exception for operation on a nonexistent external catalog
+ *
+ * @param catalogName external catalog name
+ * @param cause the cause
+ */
+case class ExternalCatalogNotExistException(
+ catalogName: String,
+ cause: Throwable)
+ extends RuntimeException(s"External catalog $catalogName does not exist.", cause) {
+
+ def this(catalogName: String) = this(catalogName, null)
+}
+
+/**
+ * Exception for adding an already existent external catalog
+ *
+ * @param catalogName external catalog name
+ * @param cause the cause
+ */
+case class ExternalCatalogAlreadyExistException(
+ catalogName: String,
+ cause: Throwable)
+ extends RuntimeException(s"External catalog $catalogName already exists.", cause) {
+
+ def this(catalogName: String) = this(catalogName, null)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index e3ed96e..8e010fa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -136,20 +136,18 @@ class ExternalCatalogSchema(
object ExternalCatalogSchema {
/**
- * Creates a FlinkExternalCatalogSchema.
+ * Registers an external catalog in a Calcite schema.
*
- * @param parentSchema Parent schema
- * @param externalCatalogIdentifier External catalog identifier
- * @param externalCatalog External catalog object
- * @return Created schema
+ * @param parentSchema Parent schema into which the catalog is registered
+ * @param externalCatalogIdentifier Identifier of the external catalog
+ * @param externalCatalog The external catalog to register
*/
- def create(
+ def registerCatalog(
parentSchema: SchemaPlus,
externalCatalogIdentifier: String,
- externalCatalog: ExternalCatalog): ExternalCatalogSchema = {
+ externalCatalog: ExternalCatalog): Unit = {
val newSchema = new ExternalCatalogSchema(externalCatalogIdentifier, externalCatalog)
val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, newSchema)
newSchema.registerSubSchemas(schemaPlusOfNewSchema)
- newSchema
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 1b5eafb..559bd75 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -511,7 +511,7 @@ case class Join(
}
case class CatalogNode(
- tableName: String,
+ tablePath: Array[String],
rowType: RelDataType) extends LeafNode {
val output: Seq[Attribute] = rowType.getFieldList.asScala.map { field =>
@@ -519,7 +519,7 @@ case class CatalogNode(
}
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- relBuilder.scan(tableName)
+ relBuilder.scan(tablePath.toIterable.asJava)
}
override def validate(tableEnv: TableEnvironment): LogicalNode = this
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
new file mode 100644
index 0000000..696468d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+ * Test for external catalog query plan.
+ */
+class ExternalCatalogTest extends TableTestBase {
+ private val table1Path: Array[String] = Array("test", "db1", "tb1")
+ private val table1ProjectedFields: Array[String] = Array("a", "b", "c")
+ private val table2Path: Array[String] = Array("test", "db2", "tb2")
+ private val table2ProjectedFields: Array[String] = Array("d", "e", "g")
+
+ @Test
+ def testBatchTableApi(): Unit = {
+ val util = batchTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val table1 = tEnv.scan("test", "db1", "tb1")
+ val table2 = tEnv.scan("test", "db2", "tb2")
+ val result = table2
+ .select('d * 2, 'e, 'g.upperCase())
+ .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table1Path, table1ProjectedFields),
+ term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+ ),
+ term("union", "_c0", "e", "_c2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testBatchSQL(): Unit = {
+ val util = batchTestUtil()
+
+ util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
+ "(SELECT a * 2, b, c FROM test.db1.tb1)"
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS EXPR$0", "e", "g"),
+ term("where", "<(d, 3)")),
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table1Path, table1ProjectedFields),
+ term("select", "*(a, 2) AS EXPR$0", "b", "c")
+ ),
+ term("union", "EXPR$0", "e", "g"))
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testStreamTableApi(): Unit = {
+ val util = streamTestUtil()
+ val tEnv = util.tEnv
+
+ util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val table1 = tEnv.scan("test", "db1", "tb1")
+ val table2 = tEnv.scan("test", "db2", "tb2")
+
+ val result = table2.where("d < 3")
+ .select('d * 2, 'e, 'g.upperCase())
+ .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+ val expected = binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2"),
+ term("where", "<(d, 3)")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(table1Path, table1ProjectedFields),
+ term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+ ),
+ term("union", "_c0", "e", "_c2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testStreamSQL(): Unit = {
+ val util = streamTestUtil()
+
+ util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
+ "(SELECT a * 2, b, c FROM test.db1.tb1)"
+
+ val expected = binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS EXPR$0", "e", "g"),
+ term("where", "<(d, 3)")),
+ unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(table1Path, table1ProjectedFields),
+ term("select", "*(a, 2) AS EXPR$0", "b", "c")
+ ),
+ term("union", "EXPR$0", "e", "g"))
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
+ s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
+ s"fields=[${fields.mkString(", ")}])"
+ }
+
+ def sourceStreamTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
+ s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
+ s"fields=[${fields.mkString(", ")}])"
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index 6ffa8c6..b780a3f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -37,7 +37,7 @@ import scala.collection.JavaConverters._
class ExternalCatalogSchemaTest {
private val schemaName: String = "test"
- private var externalCatalogSchema: ExternalCatalogSchema = _
+ private var externalCatalogSchema: SchemaPlus = _
private var calciteCatalogReader: CalciteCatalogReader = _
private val db = "db1"
private val tb = "tb1"
@@ -46,7 +46,8 @@ class ExternalCatalogSchemaTest {
def setUp(): Unit = {
val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
val catalog = CommonTestData.getInMemoryTestCatalog
- externalCatalogSchema = ExternalCatalogSchema.create(rootSchemaPlus, schemaName, catalog)
+ ExternalCatalogSchema.registerCatalog(rootSchemaPlus, schemaName, catalog)
+ externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
calciteCatalogReader = new CalciteCatalogReader(
CalciteSchema.from(rootSchemaPlus),