You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/30 09:32:16 UTC

[flink] 02/06: [hotfix][table] Unify default catalog & builtin catalog naming

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 40a2128b2144ae47ba6a60b96e1a10719a94ef6f
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jul 18 14:44:52 2019 +0200

    [hotfix][table] Unify default catalog & builtin catalog naming
---
 .../java/org/apache/flink/table/api/Table.java     |  2 +-
 .../table/api/internal/TableEnvironmentImpl.java   | 21 +++++++---------
 .../apache/flink/table/catalog/CatalogManager.java | 28 ++++++++++++++++------
 .../flink/table/catalog/FunctionCatalog.java       | 13 +++++-----
 .../table/api/scala/BatchTableEnvironment.scala    |  4 ++--
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  5 ++--
 .../flink/table/api/internal/TableEnvImpl.scala    | 27 +++++++++++----------
 7 files changed, 56 insertions(+), 44 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 0087f94..70350fa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -830,7 +830,7 @@ public interface Table {
 
 	/**
 	 * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name
-	 * in the initial default catalog.
+	 * in the built-in catalog.
 	 *
 	 * <p>A batch {@link Table} can only be written to a
 	 * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 97d27f0..0b5d5fe 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -84,9 +84,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	// and this should always be true. This avoids too many hard code.
 	private static final boolean IS_STREAM_TABLE = true;
 	private final CatalogManager catalogManager;
-
-	private final String builtinCatalogName;
-	private final String builtinDatabaseName;
 	private final OperationTreeBuilder operationTreeBuilder;
 	private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();
 
@@ -106,10 +103,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		this.execEnv = executor;
 
 		this.tableConfig = tableConfig;
-		// The current catalog and database are definitely builtin,
-		// see #create(EnvironmentSettings)
-		this.builtinCatalogName = catalogManager.getCurrentCatalog();
-		this.builtinDatabaseName = catalogManager.getCurrentDatabase();
 
 		this.functionCatalog = functionCatalog;
 		this.planner = planner;
@@ -485,8 +478,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	protected void registerTableInternal(String name, CatalogBaseTable table) {
 		try {
 			checkValidTableName(name);
-			ObjectPath path = new ObjectPath(builtinDatabaseName, name);
-			Optional<Catalog> catalog = catalogManager.getCatalog(builtinCatalogName);
+			ObjectPath path = new ObjectPath(catalogManager.getBuiltInDatabaseName(), name);
+			Optional<Catalog> catalog = catalogManager.getCatalog(catalogManager.getBuiltInCatalogName());
 			if (catalog.isPresent()) {
 				catalog.get().createTable(
 					path,
@@ -500,8 +493,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 	private void replaceTableInternal(String name, CatalogBaseTable table) {
 		try {
-			ObjectPath path = new ObjectPath(builtinDatabaseName, name);
-			Optional<Catalog> catalog = catalogManager.getCatalog(builtinCatalogName);
+			ObjectPath path = new ObjectPath(catalogManager.getBuiltInDatabaseName(), name);
+			Optional<Catalog> catalog = catalogManager.getCatalog(catalogManager.getBuiltInCatalogName());
 			if (catalog.isPresent()) {
 				catalog.get().alterTable(
 					path,
@@ -521,7 +514,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 	private void registerTableSourceInternal(String name, TableSource<?> tableSource) {
 		validateTableSource(tableSource);
-		Optional<CatalogBaseTable> table = getCatalogTable(builtinCatalogName, builtinDatabaseName, name);
+		Optional<CatalogBaseTable> table = getCatalogTable(catalogManager.getBuiltInCatalogName(),
+			catalogManager.getBuiltInDatabaseName(), name);
 
 		if (table.isPresent()) {
 			if (table.get() instanceof ConnectorCatalogTable<?, ?>) {
@@ -546,7 +540,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	}
 
 	private void registerTableSinkInternal(String name, TableSink<?> tableSink) {
-		Optional<CatalogBaseTable> table = getCatalogTable(builtinCatalogName, builtinDatabaseName, name);
+		Optional<CatalogBaseTable> table = getCatalogTable(catalogManager.getBuiltInCatalogName(),
+			catalogManager.getBuiltInDatabaseName(), name);
 
 		if (table.isPresent()) {
 			if (table.get() instanceof ConnectorCatalogTable<?, ?>) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index c5d0bc7..5933487 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -64,8 +64,8 @@ public class CatalogManager {
 
 	private String currentDatabaseName;
 
-	// The name of the default catalog
-	private final String defaultCatalogName;
+	// The name of the built-in catalog
+	private final String builtInCatalogName;
 
 	/**
 	 * Temporary solution to handle both {@link CatalogBaseTable} and
@@ -128,7 +128,9 @@ public class CatalogManager {
 		catalogs.put(defaultCatalogName, defaultCatalog);
 		this.currentCatalogName = defaultCatalogName;
 		this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
-		this.defaultCatalogName = defaultCatalogName;
+
+		// right now the default catalog is always the built-in one
+		this.builtInCatalogName = defaultCatalogName;
 	}
 
 	/**
@@ -298,12 +300,24 @@ public class CatalogManager {
 	}
 
 	/**
-	 * Gets the default catalog name.
+	 * Gets the built-in catalog name. The built-in catalog is used for storing all non-serializable
+	 * transient meta-objects.
+	 *
+	 * @return the built-in catalog name
+	 */
+	public String getBuiltInCatalogName() {
+		return builtInCatalogName;
+	}
+
+	/**
+	 * Gets the built-in database name in the built-in catalog. The built-in database is used for storing
+	 * all non-serializable transient meta-objects.
 	 *
-	 * @return the default catalog
+	 * @return the built-in database name
 	 */
-	public String getDefaultCatalogName() {
-		return defaultCatalogName;
+	public String getBuiltInDatabaseName() {
+		// The default database of the built-in catalog is also the built-in database.
+		return catalogs.get(getBuiltInCatalogName()).getDefaultDatabase();
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 01639b8..1faffde 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -150,9 +150,7 @@ public class FunctionCatalog implements FunctionLookup {
 				.map(FunctionDefinition::toString)
 				.collect(Collectors.toList()));
 
-		return result.stream()
-			.collect(Collectors.toList())
-			.toArray(new String[0]);
+		return result.toArray(new String[0]);
 	}
 
 	@Override
@@ -180,7 +178,7 @@ public class FunctionCatalog implements FunctionLookup {
 						userCandidate)
 				);
 			} else {
-				// TODO: should go thru function definition discover service
+				// TODO: should go through function definition discover service
 			}
 		} catch (FunctionNotExistException e) {
 			// Ignore
@@ -206,10 +204,11 @@ public class FunctionCatalog implements FunctionLookup {
 				.map(Function.identity());
 		}
 
-		String defaultCatalogName = catalogManager.getDefaultCatalogName();
-
 		return foundDefinition.map(definition -> new FunctionLookup.Result(
-			ObjectIdentifier.of(defaultCatalogName, catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
+			ObjectIdentifier.of(
+				catalogManager.getBuiltInCatalogName(),
+				catalogManager.getBuiltInDatabaseName(),
+				name),
 			definition)
 		);
 	}
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
index 6b3ecbc..516e6b1 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -295,11 +295,11 @@ object BatchTableEnvironment {
           classOf[ExecutionEnvironment],
           classOf[TableConfig],
           classOf[CatalogManager])
-      val defaultCatalog = "default_catalog"
+      val builtInCatalog = "default_catalog"
       val catalogManager = new CatalogManager(
         "default_catalog",
         new GenericInMemoryCatalog(
-          defaultCatalog,
+          builtInCatalog,
           "default_database")
       )
       const.newInstance(executionEnvironment, tableConfig, catalogManager)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 79631f3..0923bba 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -76,9 +76,10 @@ class FlinkRelMdHandlerTestBase {
   val tableConfig = new TableConfig()
   val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
 
-  val defaultCatalog = "default_catalog"
+  val builtinCatalog = "default_catalog"
+  val builtinDatabase = "default_database"
   val catalogManager = new CatalogManager(
-    defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
+    builtinCatalog, new GenericInMemoryCatalog(builtinCatalog, builtinDatabase))
 
   // TODO batch RelNode and stream RelNode should have different PlannerContext
   //  and RelOptCluster due to they have different trait definitions.
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 021a732..b75426c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -60,10 +60,6 @@ abstract class TableEnvImpl(
     private val catalogManager: CatalogManager)
   extends TableEnvironment {
 
-  // The current catalog and database are definitely builtin.
-  protected val builtinCatalogName: String = catalogManager.getCurrentCatalog
-  protected val builtinDatabaseName: String = catalogManager.getCurrentDatabase
-
   // Table API/SQL function catalog
   private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(catalogManager)
 
@@ -266,7 +262,9 @@ abstract class TableEnvImpl(
     tableSource: TableSource[_])
   : Unit = {
     // register
-    getCatalogTable(builtinCatalogName, builtinDatabaseName, name) match {
+    getCatalogTable(
+      catalogManager.getBuiltInCatalogName,
+      catalogManager.getBuiltInDatabaseName, name) match {
 
       // check if a table (source or sink) is registered
       case Some(table: ConnectorCatalogTable[_, _]) =>
@@ -293,7 +291,10 @@ abstract class TableEnvImpl(
     tableSink: TableSink[_])
   : Unit = {
     // check if a table (source or sink) is registered
-    getCatalogTable(builtinCatalogName, builtinDatabaseName, name) match {
+    getCatalogTable(
+      catalogManager.getBuiltInCatalogName,
+      catalogManager.getBuiltInDatabaseName,
+      name) match {
 
       // table source and/or sink is registered
       case Some(table: ConnectorCatalogTable[_, _]) =>
@@ -352,27 +353,29 @@ abstract class TableEnvImpl(
 
   protected def registerTableInternal(name: String, table: CatalogBaseTable): Unit = {
     checkValidTableName(name)
-    val path = new ObjectPath(builtinDatabaseName, name)
-    JavaScalaConversionUtil.toScala(catalogManager.getCatalog(builtinCatalogName)) match {
+    val path = new ObjectPath(catalogManager.getBuiltInDatabaseName, name)
+    JavaScalaConversionUtil.toScala(
+      catalogManager.getCatalog(catalogManager.getBuiltInCatalogName)) match {
       case Some(catalog) =>
         catalog.createTable(
           path,
           table,
           false)
-      case None => throw new TableException("The default catalog does not exist.")
+      case None => throw new TableException("The built-in catalog does not exist.")
     }
   }
 
   protected def replaceTableInternal(name: String, table: CatalogBaseTable): Unit = {
     checkValidTableName(name)
-    val path = new ObjectPath(builtinDatabaseName, name)
-    JavaScalaConversionUtil.toScala(catalogManager.getCatalog(builtinCatalogName)) match {
+    val path = new ObjectPath(catalogManager.getBuiltInDatabaseName, name)
+    JavaScalaConversionUtil.toScala(
+      catalogManager.getCatalog(catalogManager.getBuiltInCatalogName)) match {
       case Some(catalog) =>
         catalog.alterTable(
           path,
           table,
           false)
-      case None => throw new TableException("The default catalog does not exist.")
+      case None => throw new TableException("The built-in catalog does not exist.")
     }
   }