You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/19 12:21:25 UTC
flink git commit: [FLINK-6574] [table] Support nested catalogs in
ExternalCatalog.
Repository: flink
Updated Branches:
refs/heads/master 6ae759ae5 -> acea4cde5
[FLINK-6574] [table] Support nested catalogs in ExternalCatalog.
This closes #3913.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acea4cde
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acea4cde
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acea4cde
Branch: refs/heads/master
Commit: acea4cde5f0225db9e00bbef4a47fdb58419022b
Parents: 6ae759a
Author: Haohui Mai <wh...@apache.org>
Authored: Mon May 15 17:09:18 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 19 14:21:19 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/table/api/exceptions.scala | 48 +++----
.../table/catalog/CrudExternalCatalog.scala | 78 +++++-----
.../flink/table/catalog/ExternalCatalog.scala | 38 ++---
.../table/catalog/ExternalCatalogDatabase.scala | 31 ----
.../table/catalog/ExternalCatalogSchema.scala | 91 +++++-------
.../table/catalog/ExternalCatalogTable.scala | 16 ---
.../table/catalog/InMemoryExternalCatalog.scala | 142 +++++++------------
.../flink/table/ExternalCatalogTest.scala | 33 +++++
.../catalog/InMemoryExternalCatalogTest.scala | 103 +++++++-------
.../flink/table/utils/CommonTestData.scala | 17 ++-
10 files changed, 259 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 760cf75..7ea17fa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -77,63 +77,63 @@ case class UnresolvedException(msg: String) extends RuntimeException(msg)
/**
* Exception for an operation on a nonexistent table
*
- * @param db database name
- * @param table table name
- * @param cause the cause
+ * @param catalog catalog name
+ * @param table table name
+ * @param cause the cause
*/
case class TableNotExistException(
- db: String,
+ catalog: String,
table: String,
cause: Throwable)
- extends RuntimeException(s"Table $db.$table does not exist.", cause) {
+ extends RuntimeException(s"Table $catalog.$table does not exist.", cause) {
- def this(db: String, table: String) = this(db, table, null)
+ def this(catalog: String, table: String) = this(catalog, table, null)
}
/**
* Exception for adding an already existent table
*
- * @param db database name
- * @param table table name
- * @param cause the cause
+ * @param catalog catalog name
+ * @param table table name
+ * @param cause the cause
*/
case class TableAlreadyExistException(
- db: String,
+ catalog: String,
table: String,
cause: Throwable)
- extends RuntimeException(s"Table $db.$table already exists.", cause) {
+ extends RuntimeException(s"Table $catalog.$table already exists.", cause) {
- def this(db: String, table: String) = this(db, table, null)
+ def this(catalog: String, table: String) = this(catalog, table, null)
}
/**
- * Exception for operation on a nonexistent database
+ * Exception for operation on a nonexistent catalog
*
- * @param db database name
+ * @param catalog catalog name
* @param cause the cause
*/
-case class DatabaseNotExistException(
- db: String,
+case class CatalogNotExistException(
+ catalog: String,
cause: Throwable)
- extends RuntimeException(s"Database $db does not exist.", cause) {
+ extends RuntimeException(s"Catalog $catalog does not exist.", cause) {
- def this(db: String) = this(db, null)
+ def this(catalog: String) = this(catalog, null)
}
/**
- * Exception for adding an already existent database
+ * Exception for adding an already existent catalog
*
- * @param db database name
+ * @param catalog catalog name
* @param cause the cause
*/
-case class DatabaseAlreadyExistException(
- db: String,
+case class CatalogAlreadyExistException(
+ catalog: String,
cause: Throwable)
- extends RuntimeException(s"Database $db already exists.", cause) {
+ extends RuntimeException(s"Catalog $catalog already exists.", cause) {
- def this(db: String) = this(db, null)
+ def this(catalog: String) = this(catalog, null)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
index fcefa45..4db9497 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
@@ -21,88 +21,86 @@ package org.apache.flink.table.catalog
import org.apache.flink.table.api._
/**
- * The CrudExternalCatalog provides methods to create, drop, and alter databases or tables.
+ * The CrudExternalCatalog provides methods to create, drop, and alter (sub-)catalogs or tables.
*/
trait CrudExternalCatalog extends ExternalCatalog {
/**
- * Adds a table to the catalog.
+ * Adds a table to this catalog.
*
- * @param table Description of the table to add
+ * @param tableName The name of the table to add.
+ * @param table The table to add.
* @param ignoreIfExists Flag to specify behavior if a table with the given name already exists:
* if set to false, it throws a TableAlreadyExistException,
* if set to true, nothing happens.
- * @throws DatabaseNotExistException thrown if database does not exist
* @throws TableAlreadyExistException thrown if table already exists and ignoreIfExists is false
*/
- @throws[DatabaseNotExistException]
@throws[TableAlreadyExistException]
- def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
+ def createTable(tableName: String, table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
/**
- * Deletes table from a database of the catalog.
+ * Deletes table from this catalog.
*
- * @param dbName Name of the database
- * @param tableName Name of the table
- * @param ignoreIfNotExists Flag to specify behavior if the table or database does not exist:
+ * @param tableName Name of the table to delete.
+ * @param ignoreIfNotExists Flag to specify behavior if the table does not exist:
* if set to false, throw an exception,
* if set to true, nothing happens.
- * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
* @throws TableNotExistException thrown if the table does not exist in the catalog
*/
- @throws[DatabaseNotExistException]
@throws[TableNotExistException]
- def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
+ def dropTable(tableName: String, ignoreIfNotExists: Boolean): Unit
/**
- * Modifies an existing table in the catalog.
+ * Modifies an existing table of this catalog.
*
- * @param table New description of the table to update
- * @param ignoreIfNotExists Flag to specify behavior if the table or database does not exist:
+ * @param tableName The name of the table to modify.
+ * @param table The new table which replaces the existing table.
+ * @param ignoreIfNotExists Flag to specify behavior if the table does not exist:
* if set to false, throw an exception,
* if set to true, nothing happens.
- * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
- * @throws TableNotExistException thrown if the table does not exist in the catalog
+ * @throws TableNotExistException thrown if the table does not exist in the catalog
*/
- @throws[DatabaseNotExistException]
@throws[TableNotExistException]
- def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit
+ def alterTable(tableName: String, table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit
/**
- * Adds a database to the catalog.
+ * Adds a subcatalog to this catalog.
*
- * @param db Description of the database to create
- * @param ignoreIfExists Flag to specify behavior if a database with the given name already
- * exists: if set to false, it throws a DatabaseAlreadyExistException,
+ * @param name The name of the sub catalog to add.
+ * @param catalog Description of the catalog to add.
+ * @param ignoreIfExists Flag to specify behavior if a sub catalog with the given name already
+ * exists: if set to false, it throws a CatalogAlreadyExistException,
* if set to true, nothing happens.
- * @throws DatabaseAlreadyExistException thrown if the database does already exist in the catalog
- * and ignoreIfExists is false
+ * @throws CatalogAlreadyExistException
+ * thrown if the sub catalog does already exist in the catalog
+ * and ignoreIfExists is false
*/
- @throws[DatabaseAlreadyExistException]
- def createDatabase(db: ExternalCatalogDatabase, ignoreIfExists: Boolean): Unit
+ @throws[CatalogAlreadyExistException]
+ def createSubCatalog(name: String, catalog: ExternalCatalog, ignoreIfExists: Boolean): Unit
/**
- * Deletes a database from the catalog.
+ * Deletes a sub catalog from this catalog.
*
- * @param dbName Name of the database.
- * @param ignoreIfNotExists Flag to specify behavior if the database does not exist:
+ * @param name Name of the sub catalog to delete.
+ * @param ignoreIfNotExists Flag to specify behavior if the catalog does not exist:
* if set to false, throw an exception,
* if set to true, nothing happens.
- * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
+ * @throws CatalogNotExistException thrown if the sub catalog does not exist in the catalog
*/
- @throws[DatabaseNotExistException]
- def dropDatabase(dbName: String, ignoreIfNotExists: Boolean): Unit
+ @throws[CatalogNotExistException]
+ def dropSubCatalog(name: String, ignoreIfNotExists: Boolean): Unit
/**
- * Modifies an existing database in the catalog.
+ * Modifies an existing sub catalog of this catalog.
*
- * @param db New description of the database to update
- * @param ignoreIfNotExists Flag to specify behavior if the database does not exist:
+ * @param name Name of the catalog to modify.
+ * @param catalog The new sub catalog to replace the existing sub catalog.
+ * @param ignoreIfNotExists Flag to specify behavior if the sub catalog does not exist:
* if set to false, throw an exception,
* if set to true, nothing happens.
- * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
+ * @throws CatalogNotExistException thrown if the sub catalog does not exist in the catalog
*/
- @throws[DatabaseNotExistException]
- def alterDatabase(db: ExternalCatalogDatabase, ignoreIfNotExists: Boolean): Unit
+ @throws[CatalogNotExistException]
+ def alterSubCatalog(name: String, catalog: ExternalCatalog, ignoreIfNotExists: Boolean): Unit
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
index 00a35e4..5f4511b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
@@ -26,49 +26,41 @@ import org.apache.flink.table.api._
* An [[ExternalCatalog]] is the connector between an external database catalog and Flink's
* Table API.
*
- * It provides information about databases and tables such as names, schema, statistics, and
- * access information.
+ * It provides information about catalogs, databases and tables such as names, schema, statistics,
+ * and access information.
*/
trait ExternalCatalog {
/**
- * Get a table from the catalog
+ * Get a table from this catalog.
*
- * @param dbName The name of the table's database.
* @param tableName The name of the table.
- * @throws DatabaseNotExistException thrown if the database does not exist in the catalog.
* @throws TableNotExistException thrown if the table does not exist in the catalog.
- * @return the requested table
+ * @return The requested table.
*/
- @throws[DatabaseNotExistException]
@throws[TableNotExistException]
- def getTable(dbName: String, tableName: String): ExternalCatalogTable
+ def getTable(tableName: String): ExternalCatalogTable
/**
- * Get a list of all table names of a database in the catalog.
+ * Gets the names of all tables registered in this catalog.
*
- * @param dbName The name of the database.
- * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
- * @return The list of table names
+ * @return A list of the names of all registered tables.
*/
- @throws[DatabaseNotExistException]
- def listTables(dbName: String): JList[String]
+ def listTables(): JList[String]
/**
- * Gets a database from the catalog.
+ * Gets a sub catalog from this catalog.
*
- * @param dbName The name of the database.
- * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
- * @return The requested database
+ * @return The requested sub catalog.
*/
- @throws[DatabaseNotExistException]
- def getDatabase(dbName: String): ExternalCatalogDatabase
+ @throws[CatalogNotExistException]
+ def getSubCatalog(dbName: String): ExternalCatalog
/**
- * Gets a list of all databases in the catalog.
+ * Gets the names of all sub catalogs registered in this catalog.
*
- * @return The list of database names
+ * @return The list of the names of all registered sub catalogs.
*/
- def listDatabases(): JList[String]
+ def listSubCatalogs(): JList[String]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
deleted file mode 100644
index 99ab2eb..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog
-
-import java.util.{HashMap => JHashMap, Map => JMap}
-
-/**
- * Defines a database in an [[ExternalCatalog]].
- *
- * @param dbName The name of the database
- * @param properties The properties of the database
- */
-case class ExternalCatalogDatabase(
- dbName: String,
- properties: JMap[String, String] = new JHashMap())
http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index 8e010fa..ad96e77 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -22,18 +22,16 @@ import java.util.{Collections => JCollections, Collection => JCollection, Linked
import org.apache.calcite.linq4j.tree.Expression
import org.apache.calcite.schema._
-import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException}
+import org.apache.flink.table.api.{CatalogNotExistException, TableNotExistException}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
/**
- * This class is responsible for connect external catalog to calcite catalog.
- * In this way, it is possible to look-up and access tables in SQL queries
- * without registering tables in advance.
- * The databases in the external catalog registers as calcite sub-Schemas of current schema.
- * The tables in a given database registers as calcite tables
- * of the [[ExternalCatalogDatabaseSchema]].
+ * This class is responsible to connect an external catalog to Calcite's catalog.
+ * This enables to look-up and access tables in SQL queries without registering tables in advance.
+ * The the external catalog and all included sub-catalogs and tables is registered as
+ * sub-schemas and tables in Calcite.
*
* @param catalogIdentifier external catalog name
* @param catalog external catalog
@@ -45,32 +43,47 @@ class ExternalCatalogSchema(
private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
/**
- * Looks up database by the given sub-schema name in the external catalog,
- * returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name.
+ * Looks up a sub-schema by the given sub-schema name in the external catalog.
+ * Returns it wrapped in a [[ExternalCatalogSchema]] with the given database name.
*
- * @param name Sub-schema name
- * @return Sub-schema with a given name, or null
+ * @param name Name of sub-schema to look up.
+ * @return Sub-schema with a given name, or null.
*/
override def getSubSchema(name: String): Schema = {
try {
- val db = catalog.getDatabase(name)
- new ExternalCatalogDatabaseSchema(db.dbName, catalog)
+ val db = catalog.getSubCatalog(name)
+ new ExternalCatalogSchema(name, db)
} catch {
- case e: DatabaseNotExistException =>
- LOG.warn(s"Database $name does not exist in externalCatalog $catalogIdentifier")
+ case _: CatalogNotExistException =>
+ LOG.warn(s"Sub-catalog $name does not exist in externalCatalog $catalogIdentifier")
null
}
}
/**
- * Lists the databases of the external catalog,
- * returns the lists as the names of this schema's sub-schemas.
+ * Lists the sub-schemas of the external catalog.
+ * Returns a list of names of this schema's sub-schemas.
*
* @return names of this schema's child schemas
*/
- override def getSubSchemaNames: JSet[String] = new JLinkedHashSet(catalog.listDatabases())
+ override def getSubSchemaNames: JSet[String] = new JLinkedHashSet(catalog.listSubCatalogs())
- override def getTable(name: String): Table = null
+ /**
+ * Looks up and returns a table from this schema.
+ * Returns null if no table is found for the given name.
+ *
+ * @param name The name of the table to look up.
+ * @return The table or null if no table is found.
+ */
+ override def getTable(name: String): Table = try {
+ val externalCatalogTable = catalog.getTable(name)
+ ExternalTableSourceUtil.fromExternalCatalogTable(externalCatalogTable)
+ } catch {
+ case TableNotExistException(table, _, _) => {
+ LOG.warn(s"Table $table does not exist in externalCatalog $catalogIdentifier")
+ null
+ }
+ }
override def isMutable: Boolean = true
@@ -91,46 +104,8 @@ class ExternalCatalogSchema(
* @param plusOfThis
*/
def registerSubSchemas(plusOfThis: SchemaPlus) {
- catalog.listDatabases().asScala.foreach(db => plusOfThis.add(db, getSubSchema(db)))
- }
-
- private class ExternalCatalogDatabaseSchema(
- schemaName: String,
- flinkExternalCatalog: ExternalCatalog) extends Schema {
-
- override def getTable(name: String): Table = {
- try {
- val externalCatalogTable = flinkExternalCatalog.getTable(schemaName, name)
- ExternalTableSourceUtil.fromExternalCatalogTable(externalCatalogTable)
- } catch {
- case TableNotExistException(db, table, cause) => {
- LOG.warn(s"Table $db.$table does not exist in externalCatalog $catalogIdentifier")
- null
- }
- }
- }
-
- override def getTableNames: JSet[String] =
- new JLinkedHashSet(flinkExternalCatalog.listTables(schemaName))
-
- override def getSubSchema(name: String): Schema = null
-
- override def getSubSchemaNames: JSet[String] = JCollections.emptySet[String]
-
- override def isMutable: Boolean = true
-
- override def getFunctions(name: String): JCollection[Function] =
- JCollections.emptyList[Function]
-
- override def getExpression(parentSchema: SchemaPlus, name: String): Expression =
- Schemas.subSchemaExpression(parentSchema, name, getClass)
-
- override def getFunctionNames: JSet[String] = JCollections.emptySet[String]
-
- override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = true
-
+ catalog.listSubCatalogs().asScala.foreach(db => plusOfThis.add(db, getSubSchema(db)))
}
-
}
object ExternalCatalogSchema {
http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index 01eca6d..ae20718 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -27,7 +27,6 @@ import org.apache.flink.table.plan.stats.TableStats
/**
* Defines a table in an [[ExternalCatalog]].
*
- * @param identifier Identifier of the table (database name and table name)
* @param tableType Table type, e.g csv, hbase, kafka
* @param schema Schema of the table (column names and types)
* @param properties Properties of the table
@@ -37,7 +36,6 @@ import org.apache.flink.table.plan.stats.TableStats
* @param lastAccessTime Timestamp of last access of the table
*/
case class ExternalCatalogTable(
- identifier: TableIdentifier,
tableType: String,
schema: TableSchema,
properties: JMap[String, String] = new JHashMap(),
@@ -45,17 +43,3 @@ case class ExternalCatalogTable(
comment: String = null,
createTime: JLong = System.currentTimeMillis,
lastAccessTime: JLong = -1L)
-
-/**
- * Identifier for a catalog table.
- *
- * @param database Database name
- * @param table Table name
- */
-case class TableIdentifier(
- database: String,
- table: String) {
-
- override def toString: String = s"$database.$table"
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
index 6a61916..ee30a8e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
@@ -18,138 +18,106 @@
package org.apache.flink.table.catalog
-import org.apache.flink.table.api.{DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException}
import java.util.{List => JList}
-import scala.collection.mutable.HashMap
+import org.apache.flink.table.api.{CatalogAlreadyExistException, CatalogNotExistException, TableAlreadyExistException, TableNotExistException}
+
+import scala.collection.mutable
import scala.collection.JavaConverters._
/**
* This class is an in-memory implementation of [[ExternalCatalog]].
*
+ * @param name The name of the catalog
+ *
* It could be used for testing or developing instead of used in production environment.
*/
-class InMemoryExternalCatalog extends CrudExternalCatalog {
+class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog {
- private val databases = new HashMap[String, Database]
+ private val databases = new mutable.HashMap[String, ExternalCatalog]
+ private val tables = new mutable.HashMap[String, ExternalCatalogTable]
- @throws[DatabaseNotExistException]
@throws[TableAlreadyExistException]
override def createTable(
- table: ExternalCatalogTable,
- ignoreIfExists: Boolean): Unit = synchronized {
- val dbName = table.identifier.database
- val tables = getTables(dbName)
- val tableName = table.identifier.table
- if (tables.contains(tableName)) {
- if (!ignoreIfExists) {
- throw new TableAlreadyExistException(dbName, tableName)
- }
- } else {
- tables.put(tableName, table)
+ tableName: String,
+ table: ExternalCatalogTable,
+ ignoreIfExists: Boolean): Unit = synchronized {
+ tables.get(tableName) match {
+ case Some(_) if !ignoreIfExists => throw new TableAlreadyExistException(name, tableName)
+ case _ => tables.put(tableName, table)
}
}
- @throws[DatabaseNotExistException]
@throws[TableNotExistException]
- override def dropTable(
- dbName: String,
- tableName: String,
- ignoreIfNotExists: Boolean): Unit = synchronized {
- val tables = getTables(dbName)
+ override def dropTable(tableName: String, ignoreIfNotExists: Boolean): Unit = synchronized {
if (tables.remove(tableName).isEmpty && !ignoreIfNotExists) {
- throw new TableNotExistException(dbName, tableName)
+ throw new TableNotExistException(name, tableName)
}
}
- @throws[DatabaseNotExistException]
@throws[TableNotExistException]
override def alterTable(
- table: ExternalCatalogTable,
- ignoreIfNotExists: Boolean): Unit = synchronized {
- val dbName = table.identifier.database
- val tables = getTables(dbName)
- val tableName = table.identifier.table
+ tableName: String,
+ table: ExternalCatalogTable,
+ ignoreIfNotExists: Boolean): Unit = synchronized {
if (tables.contains(tableName)) {
tables.put(tableName, table)
} else if (!ignoreIfNotExists) {
- throw new TableNotExistException(dbName, tableName)
+ throw new TableNotExistException(name, tableName)
}
}
- @throws[DatabaseNotExistException]
- override def listTables(dbName: String): JList[String] = synchronized {
- val tables = getTables(dbName)
- tables.keys.toList.asJava
- }
-
- @throws[DatabaseNotExistException]
- @throws[TableNotExistException]
- override def getTable(dbName: String, tableName: String): ExternalCatalogTable = synchronized {
- val tables = getTables(dbName)
- tables.get(tableName) match {
- case Some(table) => table
- case None => throw new TableNotExistException(dbName, tableName)
+ @throws[CatalogAlreadyExistException]
+ override def createSubCatalog(
+ catalogName: String,
+ catalog: ExternalCatalog,
+ ignoreIfExists: Boolean): Unit = synchronized {
+ databases.get(catalogName) match {
+ case Some(_) if !ignoreIfExists => throw CatalogAlreadyExistException(catalogName, null)
+ case _ => databases.put(catalogName, catalog)
}
}
- @throws[DatabaseAlreadyExistException]
- override def createDatabase(
- db: ExternalCatalogDatabase,
- ignoreIfExists: Boolean): Unit = synchronized {
- val dbName = db.dbName
- if (databases.contains(dbName)) {
- if (!ignoreIfExists) {
- throw new DatabaseAlreadyExistException(dbName)
- }
- } else {
- databases.put(dbName, new Database(db))
+ @throws[CatalogNotExistException]
+ override def dropSubCatalog(
+ catalogName: String,
+ ignoreIfNotExists: Boolean): Unit = synchronized {
+ if (databases.remove(catalogName).isEmpty && !ignoreIfNotExists) {
+ throw CatalogNotExistException(catalogName, null)
}
}
- @throws[DatabaseNotExistException]
- override def alterDatabase(
- db: ExternalCatalogDatabase,
- ignoreIfNotExists: Boolean): Unit = synchronized {
- val dbName = db.dbName
- databases.get(dbName) match {
- case Some(database) => database.db = db
- case None =>
- if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(dbName)
- }
+ override def alterSubCatalog(
+ catalogName: String,
+ catalog: ExternalCatalog,
+ ignoreIfNotExists: Boolean): Unit = synchronized {
+ if (databases.contains(catalogName)) {
+ databases.put(catalogName, catalog)
+ } else if (!ignoreIfNotExists) {
+ throw new CatalogNotExistException(catalogName)
}
}
- @throws[DatabaseNotExistException]
- override def dropDatabase(
- dbName: String,
- ignoreIfNotExists: Boolean): Unit = synchronized {
- if (databases.remove(dbName).isEmpty && !ignoreIfNotExists) {
- throw new DatabaseNotExistException(dbName)
+ override def getTable(tableName: String): ExternalCatalogTable = synchronized {
+ tables.get(tableName) match {
+ case Some(t) => t
+ case _ => throw TableNotExistException(name, tableName, null)
}
}
- override def listDatabases(): JList[String] = synchronized {
- databases.keys.toList.asJava
+ override def listTables(): JList[String] = synchronized {
+ tables.keys.toList.asJava
}
- @throws[DatabaseNotExistException]
- override def getDatabase(dbName: String): ExternalCatalogDatabase = synchronized {
- databases.get(dbName) match {
- case Some(database) => database.db
- case None => throw new DatabaseNotExistException(dbName)
+ @throws[CatalogNotExistException]
+ override def getSubCatalog(catalogName: String): ExternalCatalog = synchronized {
+ databases.get(catalogName) match {
+ case Some(d) => d
+ case _ => throw CatalogNotExistException(catalogName, null)
}
}
- private def getTables(db: String): HashMap[String, ExternalCatalogTable] =
- databases.get(db) match {
- case Some(database) => database.tables
- case None => throw new DatabaseNotExistException(db)
- }
-
- private class Database(var db: ExternalCatalogDatabase) {
- val tables = new HashMap[String, ExternalCatalogTable]
+ override def listSubCatalogs(): JList[String] = synchronized {
+ databases.keys.toList.asJava
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
index d801644..27dd8d8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
@@ -28,6 +28,7 @@ import org.junit.Test
*/
class ExternalCatalogTest extends TableTestBase {
private val table1Path: Array[String] = Array("test", "db1", "tb1")
+ private val table1TopLevelPath: Array[String] = Array("test", "tb1")
private val table1ProjectedFields: Array[String] = Array("a", "b", "c")
private val table2Path: Array[String] = Array("test", "db2", "tb2")
private val table2ProjectedFields: Array[String] = Array("d", "e", "g")
@@ -148,6 +149,38 @@ class ExternalCatalogTest extends TableTestBase {
util.verifySql(sqlQuery, expected)
}
+
+ @Test
+ def testTopLevelTable(): Unit = {
+ val util = batchTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val table1 = tEnv.scan("test", "tb1")
+ val table2 = tEnv.scan("test", "db2", "tb2")
+ val result = table2
+ .select('d * 2, 'e, 'g.upperCase())
+ .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table1TopLevelPath, table1ProjectedFields),
+ term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+ ),
+ term("union", "_c0", "e", "_c2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
s"fields=[${fields.mkString(", ")}])"
http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
index 5402780..6d1d66f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
@@ -31,15 +31,14 @@ class InMemoryExternalCatalogTest {
@Before
def setUp(): Unit = {
- catalog = new InMemoryExternalCatalog()
- catalog.createDatabase(ExternalCatalogDatabase(databaseName), ignoreIfExists = false)
+ catalog = new InMemoryExternalCatalog(databaseName)
}
@Test
def testCreateTable(): Unit = {
- assertTrue(catalog.listTables(databaseName).isEmpty)
- catalog.createTable(createTableInstance(databaseName, "t1"), ignoreIfExists = false)
- val tables = catalog.listTables(databaseName)
+ assertTrue(catalog.listTables().isEmpty)
+ catalog.createTable("t1", createTableInstance(), ignoreIfExists = false)
+ val tables = catalog.listTables()
assertEquals(1, tables.size())
assertEquals("t1", tables.get(0))
}
@@ -47,36 +46,31 @@ class InMemoryExternalCatalogTest {
@Test(expected = classOf[TableAlreadyExistException])
def testCreateExistedTable(): Unit = {
val tableName = "t1"
- catalog.createTable(createTableInstance(databaseName, tableName), false)
- catalog.createTable(createTableInstance(databaseName, tableName), false)
+ catalog.createTable(tableName, createTableInstance(), ignoreIfExists = false)
+ catalog.createTable(tableName, createTableInstance(), ignoreIfExists = false)
}
@Test
def testGetTable(): Unit = {
- val originTable = createTableInstance(databaseName, "t1")
- catalog.createTable(originTable, false)
- assertEquals(catalog.getTable(databaseName, "t1"), originTable)
- }
-
- @Test(expected = classOf[DatabaseNotExistException])
- def testGetTableUnderNotExistDatabaseName(): Unit = {
- catalog.getTable("notexistedDb", "t1")
+ val originTable = createTableInstance()
+ catalog.createTable("t1", originTable, ignoreIfExists = false)
+ assertEquals(catalog.getTable("t1"), originTable)
}
@Test(expected = classOf[TableNotExistException])
def testGetNotExistTable(): Unit = {
- catalog.getTable(databaseName, "t1")
+ catalog.getTable("nonexisted")
}
@Test
def testAlterTable(): Unit = {
val tableName = "t1"
- val table = createTableInstance(databaseName, tableName)
- catalog.createTable(table, false)
- assertEquals(catalog.getTable(databaseName, tableName), table)
- val newTable = createTableInstance(databaseName, tableName)
- catalog.alterTable(newTable, false)
- val currentTable = catalog.getTable(databaseName, tableName)
+ val table = createTableInstance()
+ catalog.createTable(tableName, table, ignoreIfExists = false)
+ assertEquals(catalog.getTable(tableName), table)
+ val newTable = createTableInstance()
+ catalog.alterTable(tableName, newTable, ignoreIfNotExists = false)
+ val currentTable = catalog.getTable(tableName)
// validate the table is really replaced after alter table
assertNotEquals(table, currentTable)
assertEquals(newTable, currentTable)
@@ -84,53 +78,61 @@ class InMemoryExternalCatalogTest {
@Test(expected = classOf[TableNotExistException])
def testAlterNotExistTable(): Unit = {
- catalog.alterTable(createTableInstance(databaseName, "t1"), false)
+ catalog.alterTable("nonexisted", createTableInstance(), ignoreIfNotExists = false)
}
@Test
def testDropTable(): Unit = {
val tableName = "t1"
- catalog.createTable(createTableInstance(databaseName, tableName), false)
- assertTrue(catalog.listTables(databaseName).contains(tableName))
- catalog.dropTable(databaseName, tableName, false)
- assertFalse(catalog.listTables(databaseName).contains(tableName))
+ catalog.createTable(tableName, createTableInstance(), ignoreIfExists = false)
+ assertTrue(catalog.listTables().contains(tableName))
+ catalog.dropTable(tableName, ignoreIfNotExists = false)
+ assertFalse(catalog.listTables().contains(tableName))
}
@Test(expected = classOf[TableNotExistException])
def testDropNotExistTable(): Unit = {
- catalog.dropTable(databaseName, "t1", false)
- }
-
- @Test
- def testListDatabases(): Unit = {
- val databases = catalog.listDatabases()
- assertEquals(1, databases.size())
- assertEquals(databaseName, databases.get(0))
- }
-
- @Test
- def testGetDatabase(): Unit = {
- assertNotNull(catalog.getDatabase(databaseName))
+ catalog.dropTable("nonexisted", ignoreIfNotExists = false)
}
- @Test(expected = classOf[DatabaseNotExistException])
+ @Test(expected = classOf[CatalogNotExistException])
def testGetNotExistDatabase(): Unit = {
- catalog.getDatabase("notexistedDb")
+ catalog.getSubCatalog("notexistedDb")
}
@Test
def testCreateDatabase(): Unit = {
- val originDatabasesNum = catalog.listDatabases().size
- catalog.createDatabase(ExternalCatalogDatabase("db2"), false)
- assertEquals(catalog.listDatabases().size, originDatabasesNum + 1)
+ catalog.createSubCatalog("db2", new InMemoryExternalCatalog("db2"), ignoreIfExists = false)
+ assertEquals(1, catalog.listSubCatalogs().size)
}
- @Test(expected = classOf[DatabaseAlreadyExistException])
+ @Test(expected = classOf[CatalogAlreadyExistException])
def testCreateExistedDatabase(): Unit = {
- catalog.createDatabase(ExternalCatalogDatabase(databaseName), false)
+ catalog.createSubCatalog("existed", new InMemoryExternalCatalog("existed"),
+ ignoreIfExists = false)
+
+ assertNotNull(catalog.getSubCatalog("existed"))
+ val databases = catalog.listSubCatalogs()
+ assertEquals(1, databases.size())
+ assertEquals("existed", databases.get(0))
+
+ catalog.createSubCatalog("existed", new InMemoryExternalCatalog("existed"),
+ ignoreIfExists = false)
+ }
+
+ @Test
+ def testNestedCatalog(): Unit = {
+ val sub = new InMemoryExternalCatalog("sub")
+ val sub1 = new InMemoryExternalCatalog("sub1")
+ catalog.createSubCatalog("sub", sub, ignoreIfExists = false)
+ sub.createSubCatalog("sub1", sub1, ignoreIfExists = false)
+ sub1.createTable("table", createTableInstance(), ignoreIfExists = false)
+ val tables = catalog.getSubCatalog("sub").getSubCatalog("sub1").listTables()
+ assertEquals(1, tables.size())
+ assertEquals("table", tables.get(0))
}
- private def createTableInstance(dbName: String, tableName: String): ExternalCatalogTable = {
+ private def createTableInstance(): ExternalCatalogTable = {
val schema = new TableSchema(
Array("first", "second"),
Array(
@@ -138,9 +140,6 @@ class InMemoryExternalCatalogTest {
BasicTypeInfo.INT_TYPE_INFO
)
)
- ExternalCatalogTable(
- TableIdentifier(dbName, tableName),
- "csv",
- schema)
+ ExternalCatalogTable("csv", schema)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
index a1bfd56..6a5c52f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
@@ -73,7 +73,6 @@ object CommonTestData {
properties1.put("fieldDelim", "#")
properties1.put("rowDelim", "$")
val externalCatalogTable1 = ExternalCatalogTable(
- TableIdentifier("db1", "tb1"),
"csv",
new TableSchema(
Array("a", "b", "c"),
@@ -107,7 +106,6 @@ object CommonTestData {
properties2.put("fieldDelim", "#")
properties2.put("rowDelim", "$")
val externalCatalogTable2 = ExternalCatalogTable(
- TableIdentifier("db2", "tb2"),
"csv",
new TableSchema(
Array("d", "e", "f", "g", "h"),
@@ -120,11 +118,16 @@ object CommonTestData {
),
properties2
)
- val catalog = new InMemoryExternalCatalog
- catalog.createDatabase(ExternalCatalogDatabase("db1"), false)
- catalog.createDatabase(ExternalCatalogDatabase("db2"), false)
- catalog.createTable(externalCatalogTable1, false)
- catalog.createTable(externalCatalogTable2, false)
+ val catalog = new InMemoryExternalCatalog("test")
+ val db1 = new InMemoryExternalCatalog("db1")
+ val db2 = new InMemoryExternalCatalog("db2")
+ catalog.createSubCatalog("db1", db1, ignoreIfExists = false)
+ catalog.createSubCatalog("db2", db2, ignoreIfExists = false)
+
+ // Register the table with both catalogs
+ catalog.createTable("tb1", externalCatalogTable1, ignoreIfExists = false)
+ db1.createTable("tb1", externalCatalogTable1, ignoreIfExists = false)
+ db2.createTable("tb2", externalCatalogTable2, ignoreIfExists = false)
catalog
}