You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/30 09:32:14 UTC

[flink] branch master updated (589fd4d -> 549079c)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 589fd4d  [hotfix][python] Change "flink-python-" to "flink-python" for the change  of artifact of flink-python module (#9270)
     new 208fbea  [hotfix][table-api-java] Improve documentation of withBuiltinCatalogName & withBuiltinDatabaseName
     new 40a2128  [hotfix][table] Unify default catalog & builtin catalog naming
     new 8d37909  [FLINK-13350][table-api-java] Annotate useCatalog & useDatabase with experimental annotation
     new daee14f  [hotfix][table-common] Fix logging in the table source validation procedure
     new b1db13f  [FLINK-13279][table-sql-client] Fully qualify sink name in sql-client
     new 549079c  [hotfix][sql-client] Add USE CATALOG/USE to CLI help

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/annotation/Experimental.java  |   2 +-
 .../apache/flink/table/client/cli/CliClient.java   |   2 +-
 .../apache/flink/table/client/cli/CliStrings.java  |  20 ++--
 .../flink/table/client/cli/SqlCommandParser.java   |   2 +-
 .../client/gateway/local/ExecutionContext.java     |  28 ++---
 .../table/client/gateway/local/LocalExecutor.java  |   7 +-
 .../table/client/cli/SqlCommandParserTest.java     |   2 +-
 .../client/gateway/local/LocalExecutorITCase.java  |  23 +++-
 .../client/gateway/utils/SimpleCatalogFactory.java | 118 +++++++++++++++++++++
 .../org.apache.flink.table.factories.TableFactory  |   1 +
 .../test/resources/test-sql-client-defaults.yaml   |   3 +
 .../flink/table/api/EnvironmentSettings.java       |  29 ++++-
 .../java/org/apache/flink/table/api/Table.java     |   2 +-
 .../apache/flink/table/api/TableEnvironment.java   |   3 +
 .../table/api/internal/TableEnvironmentImpl.java   |  21 ++--
 .../apache/flink/table/catalog/CatalogManager.java |  28 +++--
 .../flink/table/catalog/FunctionCatalog.java       |  13 ++-
 .../table/api/scala/BatchTableEnvironment.scala    |   4 +-
 .../flink/table/sources/TableSourceValidation.java |   6 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   5 +-
 .../flink/table/api/internal/TableEnvImpl.scala    |  27 ++---
 .../nodes/datastream/StreamTableSourceScan.scala   |   2 +-
 22 files changed, 266 insertions(+), 82 deletions(-)
 create mode 100644 flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java


[flink] 02/06: [hotfix][table] Unify default catalog & builtin catalog naming

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 40a2128b2144ae47ba6a60b96e1a10719a94ef6f
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jul 18 14:44:52 2019 +0200

    [hotfix][table] Unify default catalog & builtin catalog naming
---
 .../java/org/apache/flink/table/api/Table.java     |  2 +-
 .../table/api/internal/TableEnvironmentImpl.java   | 21 +++++++---------
 .../apache/flink/table/catalog/CatalogManager.java | 28 ++++++++++++++++------
 .../flink/table/catalog/FunctionCatalog.java       | 13 +++++-----
 .../table/api/scala/BatchTableEnvironment.scala    |  4 ++--
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  5 ++--
 .../flink/table/api/internal/TableEnvImpl.scala    | 27 +++++++++++----------
 7 files changed, 56 insertions(+), 44 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 0087f94..70350fa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -830,7 +830,7 @@ public interface Table {
 
 	/**
 	 * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name
-	 * in the initial default catalog.
+	 * in the built-in catalog.
 	 *
 	 * <p>A batch {@link Table} can only be written to a
 	 * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 97d27f0..0b5d5fe 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -84,9 +84,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	// and this should always be true. This avoids too many hard code.
 	private static final boolean IS_STREAM_TABLE = true;
 	private final CatalogManager catalogManager;
-
-	private final String builtinCatalogName;
-	private final String builtinDatabaseName;
 	private final OperationTreeBuilder operationTreeBuilder;
 	private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();
 
@@ -106,10 +103,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		this.execEnv = executor;
 
 		this.tableConfig = tableConfig;
-		// The current catalog and database are definitely builtin,
-		// see #create(EnvironmentSettings)
-		this.builtinCatalogName = catalogManager.getCurrentCatalog();
-		this.builtinDatabaseName = catalogManager.getCurrentDatabase();
 
 		this.functionCatalog = functionCatalog;
 		this.planner = planner;
@@ -485,8 +478,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	protected void registerTableInternal(String name, CatalogBaseTable table) {
 		try {
 			checkValidTableName(name);
-			ObjectPath path = new ObjectPath(builtinDatabaseName, name);
-			Optional<Catalog> catalog = catalogManager.getCatalog(builtinCatalogName);
+			ObjectPath path = new ObjectPath(catalogManager.getBuiltInDatabaseName(), name);
+			Optional<Catalog> catalog = catalogManager.getCatalog(catalogManager.getBuiltInCatalogName());
 			if (catalog.isPresent()) {
 				catalog.get().createTable(
 					path,
@@ -500,8 +493,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 	private void replaceTableInternal(String name, CatalogBaseTable table) {
 		try {
-			ObjectPath path = new ObjectPath(builtinDatabaseName, name);
-			Optional<Catalog> catalog = catalogManager.getCatalog(builtinCatalogName);
+			ObjectPath path = new ObjectPath(catalogManager.getBuiltInDatabaseName(), name);
+			Optional<Catalog> catalog = catalogManager.getCatalog(catalogManager.getBuiltInCatalogName());
 			if (catalog.isPresent()) {
 				catalog.get().alterTable(
 					path,
@@ -521,7 +514,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 	private void registerTableSourceInternal(String name, TableSource<?> tableSource) {
 		validateTableSource(tableSource);
-		Optional<CatalogBaseTable> table = getCatalogTable(builtinCatalogName, builtinDatabaseName, name);
+		Optional<CatalogBaseTable> table = getCatalogTable(catalogManager.getBuiltInCatalogName(),
+			catalogManager.getBuiltInDatabaseName(), name);
 
 		if (table.isPresent()) {
 			if (table.get() instanceof ConnectorCatalogTable<?, ?>) {
@@ -546,7 +540,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	}
 
 	private void registerTableSinkInternal(String name, TableSink<?> tableSink) {
-		Optional<CatalogBaseTable> table = getCatalogTable(builtinCatalogName, builtinDatabaseName, name);
+		Optional<CatalogBaseTable> table = getCatalogTable(catalogManager.getBuiltInCatalogName(),
+			catalogManager.getBuiltInDatabaseName(), name);
 
 		if (table.isPresent()) {
 			if (table.get() instanceof ConnectorCatalogTable<?, ?>) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index c5d0bc7..5933487 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -64,8 +64,8 @@ public class CatalogManager {
 
 	private String currentDatabaseName;
 
-	// The name of the default catalog
-	private final String defaultCatalogName;
+	// The name of the built-in catalog
+	private final String builtInCatalogName;
 
 	/**
 	 * Temporary solution to handle both {@link CatalogBaseTable} and
@@ -128,7 +128,9 @@ public class CatalogManager {
 		catalogs.put(defaultCatalogName, defaultCatalog);
 		this.currentCatalogName = defaultCatalogName;
 		this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
-		this.defaultCatalogName = defaultCatalogName;
+
+		// right now the default catalog is always the built-in one
+		this.builtInCatalogName = defaultCatalogName;
 	}
 
 	/**
@@ -298,12 +300,24 @@ public class CatalogManager {
 	}
 
 	/**
-	 * Gets the default catalog name.
+	 * Gets the built-in catalog name. The built-in catalog is used for storing all non-serializable
+	 * transient meta-objects.
+	 *
+	 * @return the built-in catalog name
+	 */
+	public String getBuiltInCatalogName() {
+		return builtInCatalogName;
+	}
+
+	/**
+	 * Gets the built-in database name in the built-in catalog. The built-in database is used for storing
+	 * all non-serializable transient meta-objects.
 	 *
-	 * @return the default catalog
+	 * @return the built-in database name
 	 */
-	public String getDefaultCatalogName() {
-		return defaultCatalogName;
+	public String getBuiltInDatabaseName() {
+		// The default database of the built-in catalog is also the built-in database.
+		return catalogs.get(getBuiltInCatalogName()).getDefaultDatabase();
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 01639b8..1faffde 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -150,9 +150,7 @@ public class FunctionCatalog implements FunctionLookup {
 				.map(FunctionDefinition::toString)
 				.collect(Collectors.toList()));
 
-		return result.stream()
-			.collect(Collectors.toList())
-			.toArray(new String[0]);
+		return result.toArray(new String[0]);
 	}
 
 	@Override
@@ -180,7 +178,7 @@ public class FunctionCatalog implements FunctionLookup {
 						userCandidate)
 				);
 			} else {
-				// TODO: should go thru function definition discover service
+				// TODO: should go through function definition discover service
 			}
 		} catch (FunctionNotExistException e) {
 			// Ignore
@@ -206,10 +204,11 @@ public class FunctionCatalog implements FunctionLookup {
 				.map(Function.identity());
 		}
 
-		String defaultCatalogName = catalogManager.getDefaultCatalogName();
-
 		return foundDefinition.map(definition -> new FunctionLookup.Result(
-			ObjectIdentifier.of(defaultCatalogName, catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
+			ObjectIdentifier.of(
+				catalogManager.getBuiltInCatalogName(),
+				catalogManager.getBuiltInDatabaseName(),
+				name),
 			definition)
 		);
 	}
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
index 6b3ecbc..516e6b1 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -295,11 +295,11 @@ object BatchTableEnvironment {
           classOf[ExecutionEnvironment],
           classOf[TableConfig],
           classOf[CatalogManager])
-      val defaultCatalog = "default_catalog"
+      val builtInCatalog = "default_catalog"
       val catalogManager = new CatalogManager(
         "default_catalog",
         new GenericInMemoryCatalog(
-          defaultCatalog,
+          builtInCatalog,
           "default_database")
       )
       const.newInstance(executionEnvironment, tableConfig, catalogManager)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 79631f3..0923bba 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -76,9 +76,10 @@ class FlinkRelMdHandlerTestBase {
   val tableConfig = new TableConfig()
   val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
 
-  val defaultCatalog = "default_catalog"
+  val builtinCatalog = "default_catalog"
+  val builtinDatabase = "default_database"
   val catalogManager = new CatalogManager(
-    defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
+    builtinCatalog, new GenericInMemoryCatalog(builtinCatalog, builtinDatabase))
 
   // TODO batch RelNode and stream RelNode should have different PlannerContext
   //  and RelOptCluster due to they have different trait definitions.
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 021a732..b75426c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -60,10 +60,6 @@ abstract class TableEnvImpl(
     private val catalogManager: CatalogManager)
   extends TableEnvironment {
 
-  // The current catalog and database are definitely builtin.
-  protected val builtinCatalogName: String = catalogManager.getCurrentCatalog
-  protected val builtinDatabaseName: String = catalogManager.getCurrentDatabase
-
   // Table API/SQL function catalog
   private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(catalogManager)
 
@@ -266,7 +262,9 @@ abstract class TableEnvImpl(
     tableSource: TableSource[_])
   : Unit = {
     // register
-    getCatalogTable(builtinCatalogName, builtinDatabaseName, name) match {
+    getCatalogTable(
+      catalogManager.getBuiltInCatalogName,
+      catalogManager.getBuiltInDatabaseName, name) match {
 
       // check if a table (source or sink) is registered
       case Some(table: ConnectorCatalogTable[_, _]) =>
@@ -293,7 +291,10 @@ abstract class TableEnvImpl(
     tableSink: TableSink[_])
   : Unit = {
     // check if a table (source or sink) is registered
-    getCatalogTable(builtinCatalogName, builtinDatabaseName, name) match {
+    getCatalogTable(
+      catalogManager.getBuiltInCatalogName,
+      catalogManager.getBuiltInDatabaseName,
+      name) match {
 
       // table source and/or sink is registered
       case Some(table: ConnectorCatalogTable[_, _]) =>
@@ -352,27 +353,29 @@ abstract class TableEnvImpl(
 
   protected def registerTableInternal(name: String, table: CatalogBaseTable): Unit = {
     checkValidTableName(name)
-    val path = new ObjectPath(builtinDatabaseName, name)
-    JavaScalaConversionUtil.toScala(catalogManager.getCatalog(builtinCatalogName)) match {
+    val path = new ObjectPath(catalogManager.getBuiltInDatabaseName, name)
+    JavaScalaConversionUtil.toScala(
+      catalogManager.getCatalog(catalogManager.getBuiltInCatalogName)) match {
       case Some(catalog) =>
         catalog.createTable(
           path,
           table,
           false)
-      case None => throw new TableException("The default catalog does not exist.")
+      case None => throw new TableException("The built-in catalog does not exist.")
     }
   }
 
   protected def replaceTableInternal(name: String, table: CatalogBaseTable): Unit = {
     checkValidTableName(name)
-    val path = new ObjectPath(builtinDatabaseName, name)
-    JavaScalaConversionUtil.toScala(catalogManager.getCatalog(builtinCatalogName)) match {
+    val path = new ObjectPath(catalogManager.getBuiltInDatabaseName, name)
+    JavaScalaConversionUtil.toScala(
+      catalogManager.getCatalog(catalogManager.getBuiltInCatalogName)) match {
       case Some(catalog) =>
         catalog.alterTable(
           path,
           table,
           false)
-      case None => throw new TableException("The default catalog does not exist.")
+      case None => throw new TableException("The built-in catalog does not exist.")
     }
   }
 


[flink] 03/06: [FLINK-13350][table-api-java] Annotate useCatalog & useDatabase with experimental annotation

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8d37909e3bc33bca2a5dbb4eda10951a52fd231f
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jul 24 09:12:44 2019 +0200

    [FLINK-13350][table-api-java] Annotate useCatalog & useDatabase with experimental annotation
---
 .../src/main/java/org/apache/flink/annotation/Experimental.java        | 2 +-
 .../src/main/java/org/apache/flink/table/api/TableEnvironment.java     | 3 +++
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java b/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java
index 7ddcb1a..d777452 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/Experimental.java
@@ -34,7 +34,7 @@ import java.lang.annotation.Target;
  *
  */
 @Documented
-@Target(ElementType.TYPE)
+@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR })
 @Public
 public @interface Experimental {
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index c396c53..d7e9d7e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.api;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -478,6 +479,7 @@ public interface TableEnvironment {
 	 * @param catalogName The name of the catalog to set as the current default catalog.
 	 * @see TableEnvironment#useDatabase(String)
 	 */
+	@Experimental
 	void useCatalog(String catalogName);
 
 	/**
@@ -544,6 +546,7 @@ public interface TableEnvironment {
 	 * @param databaseName The name of the database to set as the current database.
 	 * @see TableEnvironment#useCatalog(String)
 	 */
+	@Experimental
 	void useDatabase(String databaseName);
 
 	/**


[flink] 04/06: [hotfix][table-common] Fix logging in the table source validation procedure

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit daee14f8a54a6e650c57ed93d960d505793f2c67
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jul 24 09:54:33 2019 +0200

    [hotfix][table-common] Fix logging in the table source validation procedure
---
 .../java/org/apache/flink/table/sources/TableSourceValidation.java  | 6 ++++--
 .../flink/table/plan/nodes/datastream/StreamTableSourceScan.scala   | 2 +-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
index d93b87c..553c675 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
@@ -269,8 +269,10 @@ public class TableSourceValidation {
 			lookupFieldType(
 				producedDataType,
 				fieldName,
-				String.format("Table field '%s' was not found in the return type $returnType of the " +
-					"TableSource.", fieldName)));
+				String.format(
+					"Table field '%s' was not found in the return type %s of the TableSource.",
+					fieldName,
+					producedDataType)));
 	}
 
 	/** Look up a field by name in a {@link DataType}. */
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 77a1eae..ab2ef0f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -103,7 +103,7 @@ class StreamTableSourceScan(
 
     // check that declared and actual type of table source DataStream are identical
     if (inputDataType != producedDataType) {
-      throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " +
+      throw new TableException(s"TableSource of type ${tableSource.getClass.getName} " +
         s"returned a DataStream of data type $inputDataType that does not match with the " +
         s"data type $producedDataType declared by the TableSource.getProducedDataType() method. " +
         s"Please validate the implementation of the TableSource.")


[flink] 06/06: [hotfix][sql-client] Add USE CATALOG/USE to CLI help

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 549079c0b565d10faf07272bdbe046a9b451b086
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue Jul 30 11:13:06 2019 +0200

    [hotfix][sql-client] Add USE CATALOG/USE to CLI help
---
 .../org/apache/flink/table/client/cli/CliClient.java |  2 +-
 .../apache/flink/table/client/cli/CliStrings.java    | 20 +++++++++++---------
 .../flink/table/client/cli/SqlCommandParser.java     |  2 +-
 .../flink/table/client/cli/SqlCommandParserTest.java |  2 +-
 4 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index eec69bc..d1fed89 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -278,7 +278,7 @@ public class CliClient {
 			case USE_CATALOG:
 				callUseCatalog(cmdCall);
 				break;
-			case USE_DATABASE:
+			case USE:
 				callUseDatabase(cmdCall);
 				break;
 			case DESCRIBE:
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index d68f56b..930ebbc 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -41,20 +41,22 @@ public final class CliStrings {
 
 	public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
 		.append("The following commands are available:\n\n")
-		.append(formatCommand(SqlCommand.QUIT, "Quits the SQL CLI client."))
 		.append(formatCommand(SqlCommand.CLEAR, "Clears the current terminal."))
-		.append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
-		.append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
-		.append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all registered user-defined functions."))
+		.append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
 		.append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
+		.append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
 		.append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
-		.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
+		.append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
 		.append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
-		.append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
-		.append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
-		.append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
-		.append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
+		.append(formatCommand(SqlCommand.QUIT, "Quits the SQL CLI client."))
 		.append(formatCommand(SqlCommand.RESET, "Resets all session configuration properties."))
+		.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
+		.append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
+		.append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all registered user-defined functions."))
+		.append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
+		.append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
+		.append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'"))
+		.append(formatCommand(SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
 		.style(AttributedStyle.DEFAULT.underline())
 		.append("\nHint")
 		.style(AttributedStyle.DEFAULT)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index 049390a..d4026d7 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -103,7 +103,7 @@ public final class SqlCommandParser {
 			"USE\\s+CATALOG\\s+(.*)",
 			SINGLE_OPERAND),
 
-		USE_DATABASE(
+		USE(
 			"USE\\s+(?!CATALOG)(.*)",
 			SINGLE_OPERAND),
 
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
index 4f903ee..fc5b8b3 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -75,7 +75,7 @@ public class SqlCommandParserTest {
 		testValidSqlCommand("source /my/file", new SqlCommandCall(SqlCommand.SOURCE, new String[] {"/my/file"}));
 		testInvalidSqlCommand("source"); // missing path
 		testValidSqlCommand("USE CATALOG default", new SqlCommandCall(SqlCommand.USE_CATALOG, new String[]{"default"}));
-		testValidSqlCommand("use default", new SqlCommandCall(SqlCommand.USE_DATABASE, new String[] {"default"}));
+		testValidSqlCommand("use default", new SqlCommandCall(SqlCommand.USE, new String[] {"default"}));
 		testInvalidSqlCommand("use catalog");
 	}
 


[flink] 01/06: [hotfix][table-api-java] Improve documentation of withBuiltinCatalogName & withBuiltinDatabaseName

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 208fbead6aff45751e77fc0aac084e0244466d4f
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jul 18 14:42:23 2019 +0200

    [hotfix][table-api-java] Improve documentation of withBuiltinCatalogName & withBuiltinDatabaseName
---
 .../flink/table/api/EnvironmentSettings.java       | 22 +++++++++++++++++++---
 1 file changed, 19 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index 8289bbc..bfd9203 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.sinks.TableSink;
 
 import javax.annotation.Nullable;
 
@@ -212,7 +214,14 @@ public class EnvironmentSettings {
 
 		/**
 		 * Specifies the name of the initial catalog to be created when instantiating
-		 * a {@link TableEnvironment}. Default: "default_catalog".
+		 * a {@link TableEnvironment}. This catalog will be used to store all
+		 * non-serializable objects such as tables and functions registered via e.g.
+		 * {@link TableEnvironment#registerTableSink(String, TableSink)} or
+		 * {@link TableEnvironment#registerFunction(String, ScalarFunction)}. It will
+		 * also be the initial value for the current catalog which can be altered via
+		 * {@link TableEnvironment#useCatalog(String)}.
+		 *
+		 * <p>Default: "default_catalog".
 		 */
 		public Builder withBuiltInCatalogName(String builtInCatalogName) {
 			this.builtInCatalogName = builtInCatalogName;
@@ -220,8 +229,15 @@ public class EnvironmentSettings {
 		}
 
 		/**
-		 * Specifies the name of the default database in the initial catalog to be created when instantiating
-		 * a {@link TableEnvironment}. Default: "default_database".
+		 * Specifies the name of the default database in the initial catalog to be
+		 * created when instantiating a {@link TableEnvironment}. The database will be
+		 * used to store all non-serializable objects such as tables and functions registered
+		 * via e.g. {@link TableEnvironment#registerTableSink(String, TableSink)} or
+		 * {@link TableEnvironment#registerFunction(String, ScalarFunction)}. It will
+		 * also be the initial value for the current database which can be altered via
+		 * {@link TableEnvironment#useDatabase(String)}.
+		 *
+		 * <p>Default: "default_database".
 		 */
 		public Builder withBuiltInDatabaseName(String builtInDatabaseName) {
 			this.builtInDatabaseName = builtInDatabaseName;


[flink] 05/06: [FLINK-13279][table-sql-client] Fully qualify sink name in sql-client

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b1db13f5cfb3b81409d6f6fb079424f44ccc826e
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jul 24 11:21:00 2019 +0200

    [FLINK-13279][table-sql-client] Fully qualify sink name in sql-client
    
    This closes #9229.
---
 .../client/gateway/local/ExecutionContext.java     |  28 ++---
 .../table/client/gateway/local/LocalExecutor.java  |   7 +-
 .../client/gateway/local/LocalExecutorITCase.java  |  23 +++-
 .../client/gateway/utils/SimpleCatalogFactory.java | 118 +++++++++++++++++++++
 .../org.apache.flink.table.factories.TableFactory  |   1 +
 .../test/resources/test-sql-client-defaults.yaml   |   3 +
 .../flink/table/api/EnvironmentSettings.java       |   7 +-
 7 files changed, 168 insertions(+), 19 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 0df7fba..ae7fb2c 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -302,20 +302,6 @@ public class ExecutionContext<T> {
 			// register catalogs
 			catalogs.forEach(tableEnv::registerCatalog);
 
-			// set current catalog
-			if (sessionContext.getCurrentCatalog().isPresent()) {
-				tableEnv.useCatalog(sessionContext.getCurrentCatalog().get());
-			} else if (mergedEnv.getExecution().getCurrentCatalog().isPresent()) {
-				tableEnv.useCatalog(mergedEnv.getExecution().getCurrentCatalog().get());
-			}
-
-			// set current database
-			if (sessionContext.getCurrentDatabase().isPresent()) {
-				tableEnv.useDatabase(sessionContext.getCurrentDatabase().get());
-			} else if (mergedEnv.getExecution().getCurrentDatabase().isPresent()) {
-				tableEnv.useDatabase(mergedEnv.getExecution().getCurrentDatabase().get());
-			}
-
 			// create query config
 			queryConfig = createQueryConfig();
 
@@ -340,6 +326,20 @@ public class ExecutionContext<T> {
 					registerTemporalTable(temporalTableEntry);
 				}
 			});
+
+			// set current catalog
+			if (sessionContext.getCurrentCatalog().isPresent()) {
+				tableEnv.useCatalog(sessionContext.getCurrentCatalog().get());
+			} else if (mergedEnv.getExecution().getCurrentCatalog().isPresent()) {
+				tableEnv.useCatalog(mergedEnv.getExecution().getCurrentCatalog().get());
+			}
+
+			// set current database
+			if (sessionContext.getCurrentDatabase().isPresent()) {
+				tableEnv.useDatabase(sessionContext.getCurrentDatabase().get());
+			} else if (mergedEnv.getExecution().getCurrentDatabase().isPresent()) {
+				tableEnv.useDatabase(mergedEnv.getExecution().getCurrentDatabase().get());
+			}
 		}
 
 		public QueryConfig getQueryConfig() {
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 7e41f1f..101f72c 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
@@ -473,7 +474,11 @@ public class LocalExecutor implements Executor {
 			// writing to a sink requires an optimization step that might reference UDFs during code compilation
 			context.wrapClassLoader(() -> {
 				envInst.getTableEnvironment().registerTableSink(jobName, result.getTableSink());
-				table.insertInto(jobName, envInst.getQueryConfig());
+				table.insertInto(
+					envInst.getQueryConfig(),
+					EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
+					EnvironmentSettings.DEFAULT_BUILTIN_DATABASE,
+					jobName);
 				return null;
 			});
 			jobGraph = envInst.createJobGraph(jobName);
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index ac1a7ae..5dbec41 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
+import org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
@@ -63,6 +64,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
@@ -160,7 +162,8 @@ public class LocalExecutorITCase extends TestLogger {
 
 		final List<String> expectedCatalogs = Arrays.asList(
 			"default_catalog",
-			"catalog1");
+			"catalog1",
+			"simple-catalog");
 		assertEquals(expectedCatalogs, actualCatalogs);
 	}
 
@@ -402,7 +405,7 @@ public class LocalExecutorITCase extends TestLogger {
 		final SessionContext session = new SessionContext("test-session", new Environment());
 
 		try {
-			// start job
+			// Case 1: Registered sink
 			final ProgramTargetDescriptor targetDescriptor = executor.executeUpdate(
 				session,
 				"INSERT INTO TableSourceSink SELECT IntegerField1 = 42, StringField1 FROM TableNumber1");
@@ -424,6 +427,22 @@ public class LocalExecutorITCase extends TestLogger {
 						fail("Unexpected job status.");
 				}
 			}
+
+			// Case 2: Temporary sink
+			session.setCurrentCatalog("simple-catalog");
+			session.setCurrentDatabase("default_database");
+			// all queries are pipelined to an in-memory sink, check it is properly registered
+			final ResultDescriptor otherCatalogDesc = executor.executeQuery(session, "SELECT * FROM `test-table`");
+
+			final List<String> otherCatalogResults = retrieveTableResult(
+				executor,
+				session,
+				otherCatalogDesc.getResultId());
+
+			TestBaseUtils.compareResultCollections(
+				SimpleCatalogFactory.TABLE_CONTENTS.stream().map(Row::toString).collect(Collectors.toList()),
+				otherCatalogResults,
+				Comparator.naturalOrder());
 		} finally {
 			executor.stop(session);
 		}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java
new file mode 100644
index 0000000..5f533fb
--- /dev/null
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/SimpleCatalogFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ConnectorCatalogTable;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.descriptors.CatalogDescriptorValidator;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Catalog factory for an in-memory catalog that contains a single non-empty table.
+ * The contents of the table are equal to {@link SimpleCatalogFactory#TABLE_CONTENTS}.
+ */
+public class SimpleCatalogFactory implements CatalogFactory {
+
+	public static final String CATALOG_TYPE_VALUE = "simple-catalog";
+
+	public static final String TEST_TABLE_NAME = "test-table";
+
+	public static final List<Row> TABLE_CONTENTS = Arrays.asList(
+		Row.of(1, "Hello"),
+		Row.of(2, "Hello world"),
+		Row.of(3, "Hello world! Hello!")
+	);
+
+	@Override
+	public Catalog createCatalog(String name, Map<String, String> properties) {
+		String database = properties.getOrDefault(
+			CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE,
+			"default_database");
+		GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog(name, database);
+
+		String tableName = properties.getOrDefault(TEST_TABLE_NAME, TEST_TABLE_NAME);
+		StreamTableSource<Row> tableSource = new StreamTableSource<Row>() {
+			@Override
+			public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+				return execEnv.fromCollection(TABLE_CONTENTS)
+					.returns(new RowTypeInfo(
+						new TypeInformation[]{Types.INT(), Types.STRING()},
+						new String[]{"id", "string"}));
+			}
+
+			@Override
+			public TableSchema getTableSchema() {
+				return TableSchema.builder()
+					.field("id", DataTypes.INT())
+					.field("string", DataTypes.STRING())
+					.build();
+			}
+
+			@Override
+			public DataType getProducedDataType() {
+				return DataTypes.ROW(
+					DataTypes.FIELD("id", DataTypes.INT()),
+					DataTypes.FIELD("string", DataTypes.STRING())
+				);
+			}
+		};
+
+		try {
+			genericInMemoryCatalog.createTable(
+				new ObjectPath(database, tableName),
+				ConnectorCatalogTable.source(tableSource, false),
+				false
+			);
+		} catch (Exception e) {
+			throw new WrappingRuntimeException(e);
+		}
+
+		return genericInMemoryCatalog;
+	}
+
+	@Override
+	public Map<String, String> requiredContext() {
+		Map<String, String> context = new HashMap<>();
+		context.put(CatalogDescriptorValidator.CATALOG_TYPE, CATALOG_TYPE_VALUE);
+		return context;
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		return Arrays.asList(CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE, TEST_TABLE_NAME);
+	}
+}
diff --git a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index b4e3095..5ba6b0b 100644
--- a/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-sql-client/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -15,5 +15,6 @@
 
 org.apache.flink.table.client.gateway.utils.DummyTableSinkFactory
 org.apache.flink.table.client.gateway.utils.DummyTableSourceFactory
+org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory
 org.apache.flink.table.client.gateway.local.DependencyTest$TestCatalogFactory
 org.apache.flink.table.client.gateway.local.DependencyTest$TestHiveCatalogFactory
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 9e0582b..9844d54 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -137,3 +137,6 @@ deployment:
 catalogs:
   - name: catalog1
     type: DependencyTest
+  - name: simple-catalog
+    type: simple-catalog
+    test-table: test-table
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index bfd9203..7eec4a3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -46,8 +46,11 @@ import java.util.Map;
  */
 @PublicEvolving
 public class EnvironmentSettings {
+
 	public static final String STREAMING_MODE = "streaming-mode";
 	public static final String CLASS_NAME = "class-name";
+	public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog";
+	public static final String DEFAULT_BUILTIN_DATABASE = "default_database";
 
 	/**
 	 * Canonical name of the {@link Planner} class to use.
@@ -158,8 +161,8 @@ public class EnvironmentSettings {
 
 		private String plannerClass = OLD_PLANNER_FACTORY;
 		private String executorClass = OLD_EXECUTOR_FACTORY;
-		private String builtInCatalogName = "default_catalog";
-		private String builtInDatabaseName = "default_database";
+		private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG;
+		private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE;
 		private boolean isStreamingMode = true;
 
 		/**