You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/30 09:32:16 UTC
[flink] 02/06: [hotfix][table] Unify default catalog & builtin
catalog naming
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 40a2128b2144ae47ba6a60b96e1a10719a94ef6f
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jul 18 14:44:52 2019 +0200
[hotfix][table] Unify default catalog & builtin catalog naming
---
.../java/org/apache/flink/table/api/Table.java | 2 +-
.../table/api/internal/TableEnvironmentImpl.java | 21 +++++++---------
.../apache/flink/table/catalog/CatalogManager.java | 28 ++++++++++++++++------
.../flink/table/catalog/FunctionCatalog.java | 13 +++++-----
.../table/api/scala/BatchTableEnvironment.scala | 4 ++--
.../plan/metadata/FlinkRelMdHandlerTestBase.scala | 5 ++--
.../flink/table/api/internal/TableEnvImpl.scala | 27 +++++++++++----------
7 files changed, 56 insertions(+), 44 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 0087f94..70350fa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -830,7 +830,7 @@ public interface Table {
/**
* Writes the {@link Table} to a {@link TableSink} that was registered under the specified name
- * in the initial default catalog.
+ * in the built-in catalog.
*
* <p>A batch {@link Table} can only be written to a
* {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 97d27f0..0b5d5fe 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -84,9 +84,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
// and this should always be true. This avoids too many hard code.
private static final boolean IS_STREAM_TABLE = true;
private final CatalogManager catalogManager;
-
- private final String builtinCatalogName;
- private final String builtinDatabaseName;
private final OperationTreeBuilder operationTreeBuilder;
private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();
@@ -106,10 +103,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
this.execEnv = executor;
this.tableConfig = tableConfig;
- // The current catalog and database are definitely builtin,
- // see #create(EnvironmentSettings)
- this.builtinCatalogName = catalogManager.getCurrentCatalog();
- this.builtinDatabaseName = catalogManager.getCurrentDatabase();
this.functionCatalog = functionCatalog;
this.planner = planner;
@@ -485,8 +478,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
protected void registerTableInternal(String name, CatalogBaseTable table) {
try {
checkValidTableName(name);
- ObjectPath path = new ObjectPath(builtinDatabaseName, name);
- Optional<Catalog> catalog = catalogManager.getCatalog(builtinCatalogName);
+ ObjectPath path = new ObjectPath(catalogManager.getBuiltInDatabaseName(), name);
+ Optional<Catalog> catalog = catalogManager.getCatalog(catalogManager.getBuiltInCatalogName());
if (catalog.isPresent()) {
catalog.get().createTable(
path,
@@ -500,8 +493,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
private void replaceTableInternal(String name, CatalogBaseTable table) {
try {
- ObjectPath path = new ObjectPath(builtinDatabaseName, name);
- Optional<Catalog> catalog = catalogManager.getCatalog(builtinCatalogName);
+ ObjectPath path = new ObjectPath(catalogManager.getBuiltInDatabaseName(), name);
+ Optional<Catalog> catalog = catalogManager.getCatalog(catalogManager.getBuiltInCatalogName());
if (catalog.isPresent()) {
catalog.get().alterTable(
path,
@@ -521,7 +514,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
private void registerTableSourceInternal(String name, TableSource<?> tableSource) {
validateTableSource(tableSource);
- Optional<CatalogBaseTable> table = getCatalogTable(builtinCatalogName, builtinDatabaseName, name);
+ Optional<CatalogBaseTable> table = getCatalogTable(catalogManager.getBuiltInCatalogName(),
+ catalogManager.getBuiltInDatabaseName(), name);
if (table.isPresent()) {
if (table.get() instanceof ConnectorCatalogTable<?, ?>) {
@@ -546,7 +540,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
}
private void registerTableSinkInternal(String name, TableSink<?> tableSink) {
- Optional<CatalogBaseTable> table = getCatalogTable(builtinCatalogName, builtinDatabaseName, name);
+ Optional<CatalogBaseTable> table = getCatalogTable(catalogManager.getBuiltInCatalogName(),
+ catalogManager.getBuiltInDatabaseName(), name);
if (table.isPresent()) {
if (table.get() instanceof ConnectorCatalogTable<?, ?>) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index c5d0bc7..5933487 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -64,8 +64,8 @@ public class CatalogManager {
private String currentDatabaseName;
- // The name of the default catalog
- private final String defaultCatalogName;
+ // The name of the built-in catalog
+ private final String builtInCatalogName;
/**
* Temporary solution to handle both {@link CatalogBaseTable} and
@@ -128,7 +128,9 @@ public class CatalogManager {
catalogs.put(defaultCatalogName, defaultCatalog);
this.currentCatalogName = defaultCatalogName;
this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
- this.defaultCatalogName = defaultCatalogName;
+
+ // right now the default catalog is always the built-in one
+ this.builtInCatalogName = defaultCatalogName;
}
/**
@@ -298,12 +300,24 @@ public class CatalogManager {
}
/**
- * Gets the default catalog name.
+ * Gets the built-in catalog name. The built-in catalog is used for storing all non-serializable
+ * transient meta-objects.
+ *
+ * @return the built-in catalog name
+ */
+ public String getBuiltInCatalogName() {
+ return builtInCatalogName;
+ }
+
+ /**
+ * Gets the built-in database name in the built-in catalog. The built-in database is used for storing
+ * all non-serializable transient meta-objects.
*
- * @return the default catalog
+ * @return the built-in database name
*/
- public String getDefaultCatalogName() {
- return defaultCatalogName;
+ public String getBuiltInDatabaseName() {
+ // The default database of the built-in catalog is also the built-in database.
+ return catalogs.get(getBuiltInCatalogName()).getDefaultDatabase();
}
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 01639b8..1faffde 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -150,9 +150,7 @@ public class FunctionCatalog implements FunctionLookup {
.map(FunctionDefinition::toString)
.collect(Collectors.toList()));
- return result.stream()
- .collect(Collectors.toList())
- .toArray(new String[0]);
+ return result.toArray(new String[0]);
}
@Override
@@ -180,7 +178,7 @@ public class FunctionCatalog implements FunctionLookup {
userCandidate)
);
} else {
- // TODO: should go thru function definition discover service
+ // TODO: should go through function definition discover service
}
} catch (FunctionNotExistException e) {
// Ignore
@@ -206,10 +204,11 @@ public class FunctionCatalog implements FunctionLookup {
.map(Function.identity());
}
- String defaultCatalogName = catalogManager.getDefaultCatalogName();
-
return foundDefinition.map(definition -> new FunctionLookup.Result(
- ObjectIdentifier.of(defaultCatalogName, catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
+ ObjectIdentifier.of(
+ catalogManager.getBuiltInCatalogName(),
+ catalogManager.getBuiltInDatabaseName(),
+ name),
definition)
);
}
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
index 6b3ecbc..516e6b1 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -295,11 +295,11 @@ object BatchTableEnvironment {
classOf[ExecutionEnvironment],
classOf[TableConfig],
classOf[CatalogManager])
- val defaultCatalog = "default_catalog"
+ val builtInCatalog = "default_catalog"
val catalogManager = new CatalogManager(
"default_catalog",
new GenericInMemoryCatalog(
- defaultCatalog,
+ builtInCatalog,
"default_database")
)
const.newInstance(executionEnvironment, tableConfig, catalogManager)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 79631f3..0923bba 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -76,9 +76,10 @@ class FlinkRelMdHandlerTestBase {
val tableConfig = new TableConfig()
val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
- val defaultCatalog = "default_catalog"
+ val builtinCatalog = "default_catalog"
+ val builtinDatabase = "default_database"
val catalogManager = new CatalogManager(
- defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
+ builtinCatalog, new GenericInMemoryCatalog(builtinCatalog, builtinDatabase))
// TODO batch RelNode and stream RelNode should have different PlannerContext
// and RelOptCluster due to they have different trait definitions.
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 021a732..b75426c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -60,10 +60,6 @@ abstract class TableEnvImpl(
private val catalogManager: CatalogManager)
extends TableEnvironment {
- // The current catalog and database are definitely builtin.
- protected val builtinCatalogName: String = catalogManager.getCurrentCatalog
- protected val builtinDatabaseName: String = catalogManager.getCurrentDatabase
-
// Table API/SQL function catalog
private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(catalogManager)
@@ -266,7 +262,9 @@ abstract class TableEnvImpl(
tableSource: TableSource[_])
: Unit = {
// register
- getCatalogTable(builtinCatalogName, builtinDatabaseName, name) match {
+ getCatalogTable(
+ catalogManager.getBuiltInCatalogName,
+ catalogManager.getBuiltInDatabaseName, name) match {
// check if a table (source or sink) is registered
case Some(table: ConnectorCatalogTable[_, _]) =>
@@ -293,7 +291,10 @@ abstract class TableEnvImpl(
tableSink: TableSink[_])
: Unit = {
// check if a table (source or sink) is registered
- getCatalogTable(builtinCatalogName, builtinDatabaseName, name) match {
+ getCatalogTable(
+ catalogManager.getBuiltInCatalogName,
+ catalogManager.getBuiltInDatabaseName,
+ name) match {
// table source and/or sink is registered
case Some(table: ConnectorCatalogTable[_, _]) =>
@@ -352,27 +353,29 @@ abstract class TableEnvImpl(
protected def registerTableInternal(name: String, table: CatalogBaseTable): Unit = {
checkValidTableName(name)
- val path = new ObjectPath(builtinDatabaseName, name)
- JavaScalaConversionUtil.toScala(catalogManager.getCatalog(builtinCatalogName)) match {
+ val path = new ObjectPath(catalogManager.getBuiltInDatabaseName, name)
+ JavaScalaConversionUtil.toScala(
+ catalogManager.getCatalog(catalogManager.getBuiltInCatalogName)) match {
case Some(catalog) =>
catalog.createTable(
path,
table,
false)
- case None => throw new TableException("The default catalog does not exist.")
+ case None => throw new TableException("The built-in catalog does not exist.")
}
}
protected def replaceTableInternal(name: String, table: CatalogBaseTable): Unit = {
checkValidTableName(name)
- val path = new ObjectPath(builtinDatabaseName, name)
- JavaScalaConversionUtil.toScala(catalogManager.getCatalog(builtinCatalogName)) match {
+ val path = new ObjectPath(catalogManager.getBuiltInDatabaseName, name)
+ JavaScalaConversionUtil.toScala(
+ catalogManager.getCatalog(catalogManager.getBuiltInCatalogName)) match {
case Some(catalog) =>
catalog.alterTable(
path,
table,
false)
- case None => throw new TableException("The default catalog does not exist.")
+ case None => throw new TableException("The built-in catalog does not exist.")
}
}