You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/19 12:21:25 UTC

flink git commit: [FLINK-6574] [table] Support nested catalogs in ExternalCatalog.

Repository: flink
Updated Branches:
  refs/heads/master 6ae759ae5 -> acea4cde5


[FLINK-6574] [table] Support nested catalogs in ExternalCatalog.

This closes #3913.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acea4cde
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acea4cde
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acea4cde

Branch: refs/heads/master
Commit: acea4cde5f0225db9e00bbef4a47fdb58419022b
Parents: 6ae759a
Author: Haohui Mai <wh...@apache.org>
Authored: Mon May 15 17:09:18 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 19 14:21:19 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/table/api/exceptions.scala |  48 +++----
 .../table/catalog/CrudExternalCatalog.scala     |  78 +++++-----
 .../flink/table/catalog/ExternalCatalog.scala   |  38 ++---
 .../table/catalog/ExternalCatalogDatabase.scala |  31 ----
 .../table/catalog/ExternalCatalogSchema.scala   |  91 +++++-------
 .../table/catalog/ExternalCatalogTable.scala    |  16 ---
 .../table/catalog/InMemoryExternalCatalog.scala | 142 +++++++------------
 .../flink/table/ExternalCatalogTest.scala       |  33 +++++
 .../catalog/InMemoryExternalCatalogTest.scala   | 103 +++++++-------
 .../flink/table/utils/CommonTestData.scala      |  17 ++-
 10 files changed, 259 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 760cf75..7ea17fa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -77,63 +77,63 @@ case class UnresolvedException(msg: String) extends RuntimeException(msg)
 /**
   * Exception for an operation on a nonexistent table
   *
-  * @param db    database name
-  * @param table table name
-  * @param cause the cause
+  * @param catalog    catalog name
+  * @param table      table name
+  * @param cause      the cause
   */
 case class TableNotExistException(
-    db: String,
+    catalog: String,
     table: String,
     cause: Throwable)
-    extends RuntimeException(s"Table $db.$table does not exist.", cause) {
+    extends RuntimeException(s"Table $catalog.$table does not exist.", cause) {
 
-  def this(db: String, table: String) = this(db, table, null)
+  def this(catalog: String, table: String) = this(catalog, table, null)
 
 }
 
 /**
   * Exception for adding an already existent table
   *
-  * @param db    database name
-  * @param table table name
-  * @param cause the cause
+  * @param catalog    catalog name
+  * @param table      table name
+  * @param cause      the cause
   */
 case class TableAlreadyExistException(
-    db: String,
+    catalog: String,
     table: String,
     cause: Throwable)
-    extends RuntimeException(s"Table $db.$table already exists.", cause) {
+    extends RuntimeException(s"Table $catalog.$table already exists.", cause) {
 
-  def this(db: String, table: String) = this(db, table, null)
+  def this(catalog: String, table: String) = this(catalog, table, null)
 
 }
 
 /**
-  * Exception for operation on a nonexistent database
+  * Exception for operation on a nonexistent catalog
   *
-  * @param db database name
+  * @param catalog catalog name
   * @param cause the cause
   */
-case class DatabaseNotExistException(
-    db: String,
+case class CatalogNotExistException(
+    catalog: String,
     cause: Throwable)
-    extends RuntimeException(s"Database $db does not exist.", cause) {
+    extends RuntimeException(s"Catalog $catalog does not exist.", cause) {
 
-  def this(db: String) = this(db, null)
+  def this(catalog: String) = this(catalog, null)
 }
 
 /**
-  * Exception for adding an already existent database
+  * Exception for adding an already existent catalog
   *
-  * @param db database name
+  * @param catalog catalog name
   * @param cause the cause
   */
-case class DatabaseAlreadyExistException(
-    db: String,
+case class CatalogAlreadyExistException(
+    catalog: String,
     cause: Throwable)
-    extends RuntimeException(s"Database $db already exists.", cause) {
+    extends RuntimeException(s"Catalog $catalog already exists.", cause) {
 
-  def this(db: String) = this(db, null)
+  def this(catalog: String) = this(catalog, null)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
index fcefa45..4db9497 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
@@ -21,88 +21,86 @@ package org.apache.flink.table.catalog
 import org.apache.flink.table.api._
 
 /**
-  * The CrudExternalCatalog provides methods to create, drop, and alter databases or tables.
+  * The CrudExternalCatalog provides methods to create, drop, and alter (sub-)catalogs or tables.
   */
 trait CrudExternalCatalog extends ExternalCatalog {
 
   /**
-    * Adds a table to the catalog.
+    * Adds a table to this catalog.
     *
-    * @param table          Description of the table to add
+    * @param tableName      The name of the table to add.
+    * @param table          The table to add.
     * @param ignoreIfExists Flag to specify behavior if a table with the given name already exists:
     *                       if set to false, it throws a TableAlreadyExistException,
     *                       if set to true, nothing happens.
-    * @throws DatabaseNotExistException  thrown if database does not exist
     * @throws TableAlreadyExistException thrown if table already exists and ignoreIfExists is false
     */
-  @throws[DatabaseNotExistException]
   @throws[TableAlreadyExistException]
-  def createTable(table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
+  def createTable(tableName: String, table: ExternalCatalogTable, ignoreIfExists: Boolean): Unit
 
   /**
-    * Deletes table from a database of the catalog.
+    * Deletes table from this catalog.
     *
-    * @param dbName            Name of the database
-    * @param tableName         Name of the table
-    * @param ignoreIfNotExists Flag to specify behavior if the table or database does not exist:
+    * @param tableName         Name of the table to delete.
+    * @param ignoreIfNotExists Flag to specify behavior if the table does not exist:
     *                          if set to false, throw an exception,
     *                          if set to true, nothing happens.
-    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
     * @throws TableNotExistException    thrown if the table does not exist in the catalog
     */
-  @throws[DatabaseNotExistException]
   @throws[TableNotExistException]
-  def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
+  def dropTable(tableName: String, ignoreIfNotExists: Boolean): Unit
 
   /**
-    * Modifies an existing table in the catalog.
+    * Modifies an existing table of this catalog.
     *
-    * @param table             New description of the table to update
-    * @param ignoreIfNotExists Flag to specify behavior if the table or database does not exist:
+    * @param tableName         The name of the table to modify.
+    * @param table             The new table which replaces the existing table.
+    * @param ignoreIfNotExists Flag to specify behavior if the table does not exist:
     *                          if set to false, throw an exception,
     *                          if set to true, nothing happens.
-    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
-    * @throws TableNotExistException    thrown if the table does not exist in the catalog
+    * @throws TableNotExistException   thrown if the table does not exist in the catalog
     */
-  @throws[DatabaseNotExistException]
   @throws[TableNotExistException]
-  def alterTable(table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit
+  def alterTable(tableName: String, table: ExternalCatalogTable, ignoreIfNotExists: Boolean): Unit
 
   /**
-    * Adds a database to the catalog.
+    * Adds a subcatalog to this catalog.
     *
-    * @param db             Description of the database to create
-    * @param ignoreIfExists Flag to specify behavior if a database with the given name already
-    *                       exists: if set to false, it throws a DatabaseAlreadyExistException,
+    * @param name           The name of the sub catalog to add.
+    * @param catalog        Description of the catalog to add.
+    * @param ignoreIfExists Flag to specify behavior if a sub catalog with the given name already
+    *                       exists: if set to false, it throws a CatalogAlreadyExistException,
     *                       if set to true, nothing happens.
-    * @throws DatabaseAlreadyExistException thrown if the database does already exist in the catalog
-    *                                       and ignoreIfExists is false
+    * @throws CatalogAlreadyExistException
+    *                       thrown if the sub catalog does already exist in the catalog
+    *                       and ignoreIfExists is false
     */
-  @throws[DatabaseAlreadyExistException]
-  def createDatabase(db: ExternalCatalogDatabase, ignoreIfExists: Boolean): Unit
+  @throws[CatalogAlreadyExistException]
+  def createSubCatalog(name: String, catalog: ExternalCatalog, ignoreIfExists: Boolean): Unit
 
   /**
-    * Deletes a database from the catalog.
+    * Deletes a sub catalog from this catalog.
     *
-    * @param dbName            Name of the database.
-    * @param ignoreIfNotExists Flag to specify behavior if the database does not exist:
+    * @param name              Name of the sub catalog to delete.
+    * @param ignoreIfNotExists Flag to specify behavior if the catalog does not exist:
     *                          if set to false, throw an exception,
     *                          if set to true, nothing happens.
-    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
+    * @throws CatalogNotExistException thrown if the sub catalog does not exist in the catalog
     */
-  @throws[DatabaseNotExistException]
-  def dropDatabase(dbName: String, ignoreIfNotExists: Boolean): Unit
+  @throws[CatalogNotExistException]
+  def dropSubCatalog(name: String, ignoreIfNotExists: Boolean): Unit
 
   /**
-    * Modifies an existing database in the catalog.
+    * Modifies an existing sub catalog of this catalog.
     *
-    * @param db                New description of the database to update
-    * @param ignoreIfNotExists Flag to specify behavior if the database does not exist:
+    * @param name              Name of the catalog to modify.
+    * @param catalog           The new sub catalog to replace the existing sub catalog.
+    * @param ignoreIfNotExists Flag to specify behavior if the sub catalog does not exist:
     *                          if set to false, throw an exception,
     *                          if set to true, nothing happens.
-    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
+    * @throws CatalogNotExistException thrown if the sub catalog does not exist in the catalog
     */
-  @throws[DatabaseNotExistException]
-  def alterDatabase(db: ExternalCatalogDatabase, ignoreIfNotExists: Boolean): Unit
+  @throws[CatalogNotExistException]
+  def alterSubCatalog(name: String, catalog: ExternalCatalog, ignoreIfNotExists: Boolean): Unit
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
index 00a35e4..5f4511b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
@@ -26,49 +26,41 @@ import org.apache.flink.table.api._
   * An [[ExternalCatalog]] is the connector between an external database catalog and Flink's
   * Table API.
   *
-  * It provides information about databases and tables such as names, schema, statistics, and
-  * access information.
+  * It provides information about catalogs, databases and tables such as names, schema, statistics,
+  * and access information.
   */
 trait ExternalCatalog {
 
   /**
-    * Get a table from the catalog
+    * Get a table from this catalog.
     *
-    * @param dbName    The name of the table's database.
     * @param tableName The name of the table.
-    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog.
     * @throws TableNotExistException    thrown if the table does not exist in the catalog.
-    * @return the requested table
+    * @return The requested table.
     */
-  @throws[DatabaseNotExistException]
   @throws[TableNotExistException]
-  def getTable(dbName: String, tableName: String): ExternalCatalogTable
+  def getTable(tableName: String): ExternalCatalogTable
 
   /**
-    * Get a list of all table names of a database in the catalog.
+    * Gets the names of all tables registered in this catalog.
     *
-    * @param dbName The name of the database.
-    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
-    * @return The list of table names
+    * @return A list of the names of all registered tables.
     */
-  @throws[DatabaseNotExistException]
-  def listTables(dbName: String): JList[String]
+  def listTables(): JList[String]
 
   /**
-    * Gets a database from the catalog.
+    * Gets a sub catalog from this catalog.
     *
-    * @param dbName The name of the database.
-    * @throws DatabaseNotExistException thrown if the database does not exist in the catalog
-    * @return The requested database
+    * @return The requested sub catalog.
     */
-  @throws[DatabaseNotExistException]
-  def getDatabase(dbName: String): ExternalCatalogDatabase
+  @throws[CatalogNotExistException]
+  def getSubCatalog(dbName: String): ExternalCatalog
 
   /**
-    * Gets a list of all databases in the catalog.
+    * Gets the names of all sub catalogs registered in this catalog.
     *
-    * @return The list of database names
+    * @return The list of the names of all registered sub catalogs.
     */
-  def listDatabases(): JList[String]
+  def listSubCatalogs(): JList[String]
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
deleted file mode 100644
index 99ab2eb..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogDatabase.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog
-
-import java.util.{HashMap => JHashMap, Map => JMap}
-
-/**
-  * Defines a database in an [[ExternalCatalog]].
-  *
-  * @param dbName     The name of the database
-  * @param properties The properties of the database
-  */
-case class ExternalCatalogDatabase(
-    dbName: String,
-    properties: JMap[String, String] = new JHashMap())

http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index 8e010fa..ad96e77 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -22,18 +22,16 @@ import java.util.{Collections => JCollections, Collection => JCollection, Linked
 
 import org.apache.calcite.linq4j.tree.Expression
 import org.apache.calcite.schema._
-import org.apache.flink.table.api.{DatabaseNotExistException, TableNotExistException}
+import org.apache.flink.table.api.{CatalogNotExistException, TableNotExistException}
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConverters._
 
 /**
-  * This class is responsible for connect external catalog to calcite catalog.
-  * In this way, it is possible to look-up and access tables in SQL queries
-  * without registering tables in advance.
-  * The databases in the external catalog registers as calcite sub-Schemas of current schema.
-  * The tables in a given database registers as calcite tables
-  * of the [[ExternalCatalogDatabaseSchema]].
+  * This class is responsible to connect an external catalog to Calcite's catalog.
+  * This enables to look-up and access tables in SQL queries without registering tables in advance.
+  * The the external catalog and all included sub-catalogs and tables is registered as
+  * sub-schemas and tables in Calcite.
   *
   * @param catalogIdentifier external catalog name
   * @param catalog           external catalog
@@ -45,32 +43,47 @@ class ExternalCatalogSchema(
   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
 
   /**
-    * Looks up database by the given sub-schema name in the external catalog,
-    * returns it Wrapped in a [[ExternalCatalogDatabaseSchema]] with the given database name.
+    * Looks up a sub-schema by the given sub-schema name in the external catalog.
+    * Returns it wrapped in a [[ExternalCatalogSchema]] with the given database name.
     *
-    * @param name Sub-schema name
-    * @return Sub-schema with a given name, or null
+    * @param name Name of sub-schema to look up.
+    * @return Sub-schema with a given name, or null.
     */
   override def getSubSchema(name: String): Schema = {
     try {
-      val db = catalog.getDatabase(name)
-      new ExternalCatalogDatabaseSchema(db.dbName, catalog)
+      val db = catalog.getSubCatalog(name)
+      new ExternalCatalogSchema(name, db)
     } catch {
-      case e: DatabaseNotExistException =>
-        LOG.warn(s"Database $name does not exist in externalCatalog $catalogIdentifier")
+      case _: CatalogNotExistException =>
+        LOG.warn(s"Sub-catalog $name does not exist in externalCatalog $catalogIdentifier")
         null
     }
   }
 
   /**
-    * Lists the databases of the external catalog,
-    * returns the lists as the names of this schema's sub-schemas.
+    * Lists the sub-schemas of the external catalog.
+    * Returns a list of names of this schema's sub-schemas.
     *
     * @return names of this schema's child schemas
     */
-  override def getSubSchemaNames: JSet[String] = new JLinkedHashSet(catalog.listDatabases())
+  override def getSubSchemaNames: JSet[String] = new JLinkedHashSet(catalog.listSubCatalogs())
 
-  override def getTable(name: String): Table = null
+  /**
+    * Looks up and returns a table from this schema.
+    * Returns null if no table is found for the given name.
+    *
+    * @param name The name of the table to look up.
+    * @return The table or null if no table is found.
+    */
+  override def getTable(name: String): Table = try {
+    val externalCatalogTable = catalog.getTable(name)
+    ExternalTableSourceUtil.fromExternalCatalogTable(externalCatalogTable)
+  } catch {
+    case TableNotExistException(table, _, _) => {
+      LOG.warn(s"Table $table does not exist in externalCatalog $catalogIdentifier")
+      null
+    }
+  }
 
   override def isMutable: Boolean = true
 
@@ -91,46 +104,8 @@ class ExternalCatalogSchema(
     * @param plusOfThis
     */
   def registerSubSchemas(plusOfThis: SchemaPlus) {
-    catalog.listDatabases().asScala.foreach(db => plusOfThis.add(db, getSubSchema(db)))
-  }
-
-  private class ExternalCatalogDatabaseSchema(
-      schemaName: String,
-      flinkExternalCatalog: ExternalCatalog) extends Schema {
-
-    override def getTable(name: String): Table = {
-      try {
-        val externalCatalogTable = flinkExternalCatalog.getTable(schemaName, name)
-        ExternalTableSourceUtil.fromExternalCatalogTable(externalCatalogTable)
-      } catch {
-        case TableNotExistException(db, table, cause) => {
-          LOG.warn(s"Table $db.$table does not exist in externalCatalog $catalogIdentifier")
-          null
-        }
-      }
-    }
-
-    override def getTableNames: JSet[String] =
-      new JLinkedHashSet(flinkExternalCatalog.listTables(schemaName))
-
-    override def getSubSchema(name: String): Schema = null
-
-    override def getSubSchemaNames: JSet[String] = JCollections.emptySet[String]
-
-    override def isMutable: Boolean = true
-
-    override def getFunctions(name: String): JCollection[Function] =
-      JCollections.emptyList[Function]
-
-    override def getExpression(parentSchema: SchemaPlus, name: String): Expression =
-      Schemas.subSchemaExpression(parentSchema, name, getClass)
-
-    override def getFunctionNames: JSet[String] = JCollections.emptySet[String]
-
-    override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = true
-
+    catalog.listSubCatalogs().asScala.foreach(db => plusOfThis.add(db, getSubSchema(db)))
   }
-
 }
 
 object ExternalCatalogSchema {

http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index 01eca6d..ae20718 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -27,7 +27,6 @@ import org.apache.flink.table.plan.stats.TableStats
 /**
   * Defines a table in an [[ExternalCatalog]].
   *
-  * @param identifier           Identifier of the table (database name and table name)
   * @param tableType            Table type, e.g csv, hbase, kafka
   * @param schema               Schema of the table (column names and types)
   * @param properties           Properties of the table
@@ -37,7 +36,6 @@ import org.apache.flink.table.plan.stats.TableStats
   * @param lastAccessTime       Timestamp of last access of the table
   */
 case class ExternalCatalogTable(
-    identifier: TableIdentifier,
     tableType: String,
     schema: TableSchema,
     properties: JMap[String, String] = new JHashMap(),
@@ -45,17 +43,3 @@ case class ExternalCatalogTable(
     comment: String = null,
     createTime: JLong = System.currentTimeMillis,
     lastAccessTime: JLong = -1L)
-
-/**
-  * Identifier for a catalog table.
-  *
-  * @param database Database name
-  * @param table    Table name
-  */
-case class TableIdentifier(
-    database: String,
-    table: String) {
-
-  override def toString: String = s"$database.$table"
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
index 6a61916..ee30a8e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
@@ -18,138 +18,106 @@
 
 package org.apache.flink.table.catalog
 
-import org.apache.flink.table.api.{DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException}
 import java.util.{List => JList}
 
-import scala.collection.mutable.HashMap
+import org.apache.flink.table.api.{CatalogAlreadyExistException, CatalogNotExistException, TableAlreadyExistException, TableNotExistException}
+
+import scala.collection.mutable
 import scala.collection.JavaConverters._
 
 /**
   * This class is an in-memory implementation of [[ExternalCatalog]].
   *
+  * @param name      The name of the catalog
+  *
   * It could be used for testing or developing instead of used in production environment.
   */
-class InMemoryExternalCatalog extends CrudExternalCatalog {
+class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog {
 
-  private val databases = new HashMap[String, Database]
+  private val databases = new mutable.HashMap[String, ExternalCatalog]
+  private val tables = new mutable.HashMap[String, ExternalCatalogTable]
 
-  @throws[DatabaseNotExistException]
   @throws[TableAlreadyExistException]
   override def createTable(
-      table: ExternalCatalogTable,
-      ignoreIfExists: Boolean): Unit = synchronized {
-    val dbName = table.identifier.database
-    val tables = getTables(dbName)
-    val tableName = table.identifier.table
-    if (tables.contains(tableName)) {
-      if (!ignoreIfExists) {
-        throw new TableAlreadyExistException(dbName, tableName)
-      }
-    } else {
-      tables.put(tableName, table)
+    tableName: String,
+    table: ExternalCatalogTable,
+    ignoreIfExists: Boolean): Unit = synchronized {
+    tables.get(tableName) match {
+      case Some(_) if !ignoreIfExists => throw new TableAlreadyExistException(name, tableName)
+      case _ => tables.put(tableName, table)
     }
   }
 
-  @throws[DatabaseNotExistException]
   @throws[TableNotExistException]
-  override def dropTable(
-      dbName: String,
-      tableName: String,
-      ignoreIfNotExists: Boolean): Unit = synchronized {
-    val tables = getTables(dbName)
+  override def dropTable(tableName: String, ignoreIfNotExists: Boolean): Unit = synchronized {
     if (tables.remove(tableName).isEmpty && !ignoreIfNotExists) {
-      throw new TableNotExistException(dbName, tableName)
+      throw new TableNotExistException(name, tableName)
     }
   }
 
-  @throws[DatabaseNotExistException]
   @throws[TableNotExistException]
   override def alterTable(
-      table: ExternalCatalogTable,
-      ignoreIfNotExists: Boolean): Unit = synchronized {
-    val dbName = table.identifier.database
-    val tables = getTables(dbName)
-    val tableName = table.identifier.table
+    tableName: String,
+    table: ExternalCatalogTable,
+    ignoreIfNotExists: Boolean): Unit = synchronized {
     if (tables.contains(tableName)) {
       tables.put(tableName, table)
     } else if (!ignoreIfNotExists) {
-      throw new TableNotExistException(dbName, tableName)
+      throw new TableNotExistException(name, tableName)
     }
   }
 
-  @throws[DatabaseNotExistException]
-  override def listTables(dbName: String): JList[String] = synchronized {
-    val tables = getTables(dbName)
-    tables.keys.toList.asJava
-  }
-
-  @throws[DatabaseNotExistException]
-  @throws[TableNotExistException]
-  override def getTable(dbName: String, tableName: String): ExternalCatalogTable = synchronized {
-    val tables = getTables(dbName)
-    tables.get(tableName) match {
-      case Some(table) => table
-      case None => throw new TableNotExistException(dbName, tableName)
+  @throws[CatalogAlreadyExistException]
+  override def createSubCatalog(
+    catalogName: String,
+    catalog: ExternalCatalog,
+    ignoreIfExists: Boolean): Unit = synchronized {
+    databases.get(catalogName) match {
+      case Some(_) if !ignoreIfExists => throw CatalogAlreadyExistException(catalogName, null)
+      case _ => databases.put(catalogName, catalog)
     }
   }
 
-  @throws[DatabaseAlreadyExistException]
-  override def createDatabase(
-      db: ExternalCatalogDatabase,
-      ignoreIfExists: Boolean): Unit = synchronized {
-    val dbName = db.dbName
-    if (databases.contains(dbName)) {
-      if (!ignoreIfExists) {
-        throw new DatabaseAlreadyExistException(dbName)
-      }
-    } else {
-      databases.put(dbName, new Database(db))
+  @throws[CatalogNotExistException]
+  override def dropSubCatalog(
+    catalogName: String,
+    ignoreIfNotExists: Boolean): Unit = synchronized {
+    if (databases.remove(catalogName).isEmpty && !ignoreIfNotExists) {
+      throw CatalogNotExistException(catalogName, null)
     }
   }
 
-  @throws[DatabaseNotExistException]
-  override def alterDatabase(
-      db: ExternalCatalogDatabase,
-      ignoreIfNotExists: Boolean): Unit = synchronized {
-    val dbName = db.dbName
-    databases.get(dbName) match {
-      case Some(database) => database.db = db
-      case None =>
-        if (!ignoreIfNotExists) {
-          throw new DatabaseNotExistException(dbName)
-        }
+  override def alterSubCatalog(
+    catalogName: String,
+    catalog: ExternalCatalog,
+    ignoreIfNotExists: Boolean): Unit = synchronized {
+    if (databases.contains(catalogName)) {
+      databases.put(catalogName, catalog)
+    } else if (!ignoreIfNotExists) {
+      throw new CatalogNotExistException(catalogName)
     }
   }
 
-  @throws[DatabaseNotExistException]
-  override def dropDatabase(
-      dbName: String,
-      ignoreIfNotExists: Boolean): Unit = synchronized {
-    if (databases.remove(dbName).isEmpty && !ignoreIfNotExists) {
-      throw new DatabaseNotExistException(dbName)
+  override def getTable(tableName: String): ExternalCatalogTable = synchronized {
+    tables.get(tableName) match {
+      case Some(t) => t
+      case _ => throw TableNotExistException(name, tableName, null)
     }
   }
 
-  override def listDatabases(): JList[String] = synchronized {
-    databases.keys.toList.asJava
+  override def listTables(): JList[String] = synchronized {
+    tables.keys.toList.asJava
   }
 
-  @throws[DatabaseNotExistException]
-  override def getDatabase(dbName: String): ExternalCatalogDatabase = synchronized {
-    databases.get(dbName) match {
-      case Some(database) => database.db
-      case None => throw new DatabaseNotExistException(dbName)
+  @throws[CatalogNotExistException]
+  override def getSubCatalog(catalogName: String): ExternalCatalog = synchronized {
+    databases.get(catalogName) match {
+      case Some(d) => d
+      case _ => throw CatalogNotExistException(catalogName, null)
     }
   }
 
-  private def getTables(db: String): HashMap[String, ExternalCatalogTable] =
-    databases.get(db) match {
-      case Some(database) => database.tables
-      case None => throw new DatabaseNotExistException(db)
-    }
-
-  private class Database(var db: ExternalCatalogDatabase) {
-    val tables = new HashMap[String, ExternalCatalogTable]
+  override def listSubCatalogs(): JList[String] = synchronized {
+    databases.keys.toList.asJava
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
index d801644..27dd8d8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
@@ -28,6 +28,7 @@ import org.junit.Test
   */
 class ExternalCatalogTest extends TableTestBase {
   private val table1Path: Array[String] = Array("test", "db1", "tb1")
+  private val table1TopLevelPath: Array[String] = Array("test", "tb1")
   private val table1ProjectedFields: Array[String] = Array("a", "b", "c")
   private val table2Path: Array[String] = Array("test", "db2", "tb2")
   private val table2ProjectedFields: Array[String] = Array("d", "e", "g")
@@ -148,6 +149,38 @@ class ExternalCatalogTest extends TableTestBase {
     util.verifySql(sqlQuery, expected)
   }
 
+
+  @Test
+  def testTopLevelTable(): Unit = {
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+    val table1 = tEnv.scan("test", "tb1")
+    val table2 = tEnv.scan("test", "db2", "tb2")
+    val result = table2
+      .select('d * 2, 'e, 'g.upperCase())
+      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
+      ),
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table1TopLevelPath, table1ProjectedFields),
+        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+      ),
+      term("union", "_c0", "e", "_c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
   def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
     s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
         s"fields=[${fields.mkString(", ")}])"

http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
index 5402780..6d1d66f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
@@ -31,15 +31,14 @@ class InMemoryExternalCatalogTest {
 
   @Before
   def setUp(): Unit = {
-    catalog = new InMemoryExternalCatalog()
-    catalog.createDatabase(ExternalCatalogDatabase(databaseName), ignoreIfExists = false)
+    catalog = new InMemoryExternalCatalog(databaseName)
   }
 
   @Test
   def testCreateTable(): Unit = {
-    assertTrue(catalog.listTables(databaseName).isEmpty)
-    catalog.createTable(createTableInstance(databaseName, "t1"), ignoreIfExists = false)
-    val tables = catalog.listTables(databaseName)
+    assertTrue(catalog.listTables().isEmpty)
+    catalog.createTable("t1", createTableInstance(), ignoreIfExists = false)
+    val tables = catalog.listTables()
     assertEquals(1, tables.size())
     assertEquals("t1", tables.get(0))
   }
@@ -47,36 +46,31 @@ class InMemoryExternalCatalogTest {
   @Test(expected = classOf[TableAlreadyExistException])
   def testCreateExistedTable(): Unit = {
     val tableName = "t1"
-    catalog.createTable(createTableInstance(databaseName, tableName), false)
-    catalog.createTable(createTableInstance(databaseName, tableName), false)
+    catalog.createTable(tableName, createTableInstance(), ignoreIfExists = false)
+    catalog.createTable(tableName, createTableInstance(), ignoreIfExists = false)
   }
 
   @Test
   def testGetTable(): Unit = {
-    val originTable = createTableInstance(databaseName, "t1")
-    catalog.createTable(originTable, false)
-    assertEquals(catalog.getTable(databaseName, "t1"), originTable)
-  }
-
-  @Test(expected = classOf[DatabaseNotExistException])
-  def testGetTableUnderNotExistDatabaseName(): Unit = {
-    catalog.getTable("notexistedDb", "t1")
+    val originTable = createTableInstance()
+    catalog.createTable("t1", originTable, ignoreIfExists = false)
+    assertEquals(catalog.getTable("t1"), originTable)
   }
 
   @Test(expected = classOf[TableNotExistException])
   def testGetNotExistTable(): Unit = {
-    catalog.getTable(databaseName, "t1")
+    catalog.getTable("nonexisted")
   }
 
   @Test
   def testAlterTable(): Unit = {
     val tableName = "t1"
-    val table = createTableInstance(databaseName, tableName)
-    catalog.createTable(table, false)
-    assertEquals(catalog.getTable(databaseName, tableName), table)
-    val newTable = createTableInstance(databaseName, tableName)
-    catalog.alterTable(newTable, false)
-    val currentTable = catalog.getTable(databaseName, tableName)
+    val table = createTableInstance()
+    catalog.createTable(tableName, table, ignoreIfExists = false)
+    assertEquals(catalog.getTable(tableName), table)
+    val newTable = createTableInstance()
+    catalog.alterTable(tableName, newTable, ignoreIfNotExists = false)
+    val currentTable = catalog.getTable(tableName)
     // validate the table is really replaced after alter table
     assertNotEquals(table, currentTable)
     assertEquals(newTable, currentTable)
@@ -84,53 +78,61 @@ class InMemoryExternalCatalogTest {
 
   @Test(expected = classOf[TableNotExistException])
   def testAlterNotExistTable(): Unit = {
-    catalog.alterTable(createTableInstance(databaseName, "t1"), false)
+    catalog.alterTable("nonexisted", createTableInstance(), ignoreIfNotExists = false)
   }
 
   @Test
   def testDropTable(): Unit = {
     val tableName = "t1"
-    catalog.createTable(createTableInstance(databaseName, tableName), false)
-    assertTrue(catalog.listTables(databaseName).contains(tableName))
-    catalog.dropTable(databaseName, tableName, false)
-    assertFalse(catalog.listTables(databaseName).contains(tableName))
+    catalog.createTable(tableName, createTableInstance(), ignoreIfExists = false)
+    assertTrue(catalog.listTables().contains(tableName))
+    catalog.dropTable(tableName, ignoreIfNotExists = false)
+    assertFalse(catalog.listTables().contains(tableName))
   }
 
   @Test(expected = classOf[TableNotExistException])
   def testDropNotExistTable(): Unit = {
-    catalog.dropTable(databaseName, "t1", false)
-  }
-
-  @Test
-  def testListDatabases(): Unit = {
-    val databases = catalog.listDatabases()
-    assertEquals(1, databases.size())
-    assertEquals(databaseName, databases.get(0))
-  }
-
-  @Test
-  def testGetDatabase(): Unit = {
-    assertNotNull(catalog.getDatabase(databaseName))
+    catalog.dropTable("nonexisted", ignoreIfNotExists = false)
   }
 
-  @Test(expected = classOf[DatabaseNotExistException])
+  @Test(expected = classOf[CatalogNotExistException])
   def testGetNotExistDatabase(): Unit = {
-    catalog.getDatabase("notexistedDb")
+    catalog.getSubCatalog("notexistedDb")
   }
 
   @Test
   def testCreateDatabase(): Unit = {
-    val originDatabasesNum = catalog.listDatabases().size
-    catalog.createDatabase(ExternalCatalogDatabase("db2"), false)
-    assertEquals(catalog.listDatabases().size, originDatabasesNum + 1)
+    catalog.createSubCatalog("db2", new InMemoryExternalCatalog("db2"), ignoreIfExists = false)
+    assertEquals(1, catalog.listSubCatalogs().size)
   }
 
-  @Test(expected = classOf[DatabaseAlreadyExistException])
+  @Test(expected = classOf[CatalogAlreadyExistException])
   def testCreateExistedDatabase(): Unit = {
-    catalog.createDatabase(ExternalCatalogDatabase(databaseName), false)
+    catalog.createSubCatalog("existed", new InMemoryExternalCatalog("existed"),
+      ignoreIfExists = false)
+
+    assertNotNull(catalog.getSubCatalog("existed"))
+    val databases = catalog.listSubCatalogs()
+    assertEquals(1, databases.size())
+    assertEquals("existed", databases.get(0))
+
+    catalog.createSubCatalog("existed", new InMemoryExternalCatalog("existed"),
+      ignoreIfExists = false)
+  }
+
+  @Test
+  def testNestedCatalog(): Unit = {
+    val sub = new InMemoryExternalCatalog("sub")
+    val sub1 = new InMemoryExternalCatalog("sub1")
+    catalog.createSubCatalog("sub", sub, ignoreIfExists = false)
+    sub.createSubCatalog("sub1", sub1, ignoreIfExists = false)
+    sub1.createTable("table", createTableInstance(), ignoreIfExists = false)
+    val tables = catalog.getSubCatalog("sub").getSubCatalog("sub1").listTables()
+    assertEquals(1, tables.size())
+    assertEquals("table", tables.get(0))
   }
 
-  private def createTableInstance(dbName: String, tableName: String): ExternalCatalogTable = {
+  private def createTableInstance(): ExternalCatalogTable = {
     val schema = new TableSchema(
       Array("first", "second"),
       Array(
@@ -138,9 +140,6 @@ class InMemoryExternalCatalogTest {
         BasicTypeInfo.INT_TYPE_INFO
       )
     )
-    ExternalCatalogTable(
-      TableIdentifier(dbName, tableName),
-      "csv",
-      schema)
+    ExternalCatalogTable("csv", schema)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
index a1bfd56..6a5c52f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
@@ -73,7 +73,6 @@ object CommonTestData {
     properties1.put("fieldDelim", "#")
     properties1.put("rowDelim", "$")
     val externalCatalogTable1 = ExternalCatalogTable(
-      TableIdentifier("db1", "tb1"),
       "csv",
       new TableSchema(
         Array("a", "b", "c"),
@@ -107,7 +106,6 @@ object CommonTestData {
     properties2.put("fieldDelim", "#")
     properties2.put("rowDelim", "$")
     val externalCatalogTable2 = ExternalCatalogTable(
-      TableIdentifier("db2", "tb2"),
       "csv",
       new TableSchema(
         Array("d", "e", "f", "g", "h"),
@@ -120,11 +118,16 @@ object CommonTestData {
       ),
       properties2
     )
-    val catalog = new InMemoryExternalCatalog
-    catalog.createDatabase(ExternalCatalogDatabase("db1"), false)
-    catalog.createDatabase(ExternalCatalogDatabase("db2"), false)
-    catalog.createTable(externalCatalogTable1, false)
-    catalog.createTable(externalCatalogTable2, false)
+    val catalog = new InMemoryExternalCatalog("test")
+    val db1 = new InMemoryExternalCatalog("db1")
+    val db2 = new InMemoryExternalCatalog("db2")
+    catalog.createSubCatalog("db1", db1, ignoreIfExists = false)
+    catalog.createSubCatalog("db2", db2, ignoreIfExists = false)
+
+    // Register the table with both catalogs
+    catalog.createTable("tb1", externalCatalogTable1, ignoreIfExists = false)
+    db1.createTable("tb1", externalCatalogTable1, ignoreIfExists = false)
+    db2.createTable("tb2", externalCatalogTable2, ignoreIfExists = false)
     catalog
   }