You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/31 17:50:12 UTC

[flink] branch master updated: [FLINK-12679][sql-client] Support 'default-database' config for catalog entries in SQL CLI yaml file

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b7577c  [FLINK-12679][sql-client] Support 'default-database' config for catalog entries in SQL CLI yaml file
8b7577c is described below

commit 8b7577cfafa067ca719aac64695c2b5acc8d56f2
Author: bowen.li <bo...@gmail.com>
AuthorDate: Tue May 28 16:09:56 2019 -0700

    [FLINK-12679][sql-client] Support 'default-database' config for catalog entries in SQL CLI yaml file
---
 .../table/client/gateway/local/DependencyTest.java | 20 ++++++++++++++----
 .../client/gateway/local/ExecutionContextTest.java |  4 ++--
 .../test/resources/test-sql-client-catalogs.yaml   |  7 +++++--
 .../flink/table/descriptors/CatalogDescriptor.java | 24 ++++++++++++++++++++++
 .../descriptors/CatalogDescriptorValidator.java    |  6 ++++++
 5 files changed, 53 insertions(+), 8 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 109246c..7730b0d 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -41,7 +41,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
 import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
 import static org.junit.Assert.assertEquals;
 
@@ -125,6 +127,7 @@ public class DependencyTest {
 		public List<String> supportedProperties() {
 			final List<String> properties = new ArrayList<>();
 			properties.add(TEST_PROPERTY);
+			properties.add(CATALOG_DEFAULT_DATABASE);
 			return properties;
 		}
 
@@ -132,7 +135,14 @@ public class DependencyTest {
 		public Catalog createCatalog(String name, Map<String, String> properties) {
 			final DescriptorProperties params = new DescriptorProperties(true);
 			params.putProperties(properties);
-			return new TestCatalog(name);
+
+			final Optional<String> defaultDatabase = params.getOptionalString(CATALOG_DEFAULT_DATABASE);
+
+			if (defaultDatabase.isPresent()) {
+				return new TestCatalog(name, defaultDatabase.get());
+			} else {
+				return new TestCatalog(name);
+			}
 		}
 	}
 
@@ -141,10 +151,12 @@ public class DependencyTest {
 	 */
 	public static class TestCatalog extends GenericInMemoryCatalog {
 
-		private static final String TEST_DATABASE_NAME = "mydatabase";
-
 		public TestCatalog(String name) {
-			super(name, TEST_DATABASE_NAME);
+			super(name);
+		}
+
+		public TestCatalog(String name, String defaultDatabase) {
+			super(name, defaultDatabase);
 		}
 	}
 }
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index 21cf7d0..3783f37 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -73,12 +73,12 @@ public class ExecutionContextTest {
 
 	@Test
 	public void testCatalogs() throws Exception {
-		final String catalogName = "catalog1";
+		final String catalogName = "catalog2";
 		final ExecutionContext<?> context = createCatalogExecutionContext();
 		final TableEnvironment tableEnv = context.createEnvironmentInstance().getTableEnvironment();
 
 		assertEquals(tableEnv.getCurrentCatalog(), catalogName);
-		assertEquals(tableEnv.getCurrentDatabase(), "mydatabase");
+		assertEquals(tableEnv.getCurrentDatabase(), "test-default-database");
 	}
 
 	@Test
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
index e915930..324ae38 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
@@ -112,8 +112,8 @@ execution:
     max-failures-per-interval: 10
     failure-rate-interval: 99000
     delay: 1000
-  current-catalog: catalog1
-  current-database: mydatabase
+  current-catalog: catalog2
+  current-database: test-default-database
 
 deployment:
   response-timeout: 5000
@@ -121,3 +121,6 @@ deployment:
 catalogs:
   - name: catalog1
     type: DependencyTest
+  - name: catalog2
+    type: DependencyTest
+    default-database: test-default-database
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java
index 18b433e..753e63c 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 
 import java.util.Map;
 
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
 import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
 import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
 
@@ -35,6 +36,8 @@ public abstract class CatalogDescriptor extends DescriptorBase {
 
 	private final int propertyVersion;
 
+	private final String defaultDatabase;
+
 	/**
 	 * Constructs a {@link CatalogDescriptor}.
 	 *
@@ -42,8 +45,20 @@ public abstract class CatalogDescriptor extends DescriptorBase {
 	 * @param propertyVersion property version for backwards compatibility
 	 */
 	public CatalogDescriptor(String type, int propertyVersion) {
+		this(type, propertyVersion, null);
+	}
+
+	/**
+	 * Constructs a {@link CatalogDescriptor}.
+	 *
+	 * @param type string that identifies this catalog
+	 * @param propertyVersion property version for backwards compatibility
+	 * @param defaultDatabase default database of the catalog
+	 */
+	public CatalogDescriptor(String type, int propertyVersion, String defaultDatabase) {
 		this.type = type;
 		this.propertyVersion = propertyVersion;
+		this.defaultDatabase = defaultDatabase;
 	}
 
 	@Override
@@ -51,10 +66,19 @@ public abstract class CatalogDescriptor extends DescriptorBase {
 		final DescriptorProperties properties = new DescriptorProperties();
 		properties.putString(CATALOG_TYPE, type);
 		properties.putLong(CATALOG_PROPERTY_VERSION, propertyVersion);
+
+		if (defaultDatabase != null) {
+			properties.putString(CATALOG_DEFAULT_DATABASE, defaultDatabase);
+		}
+
 		properties.putProperties(toCatalogProperties());
 		return properties.asMap();
 	}
 
+	public String getDefaultDatabase() {
+		return defaultDatabase;
+	}
+
 	/**
 	 * Converts this descriptor into a set of catalog properties.
 	 */
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java
index 723dcb0..a907ac7 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java
@@ -37,9 +37,15 @@ public abstract class CatalogDescriptorValidator implements DescriptorValidator
 	 */
 	public static final String CATALOG_PROPERTY_VERSION = "property-version";
 
+	/**
+	 * Key for describing the default database of the catalog.
+	 */
+	public static final String CATALOG_DEFAULT_DATABASE = "default-database";
+
 	@Override
 	public void validate(DescriptorProperties properties) {
 		properties.validateString(CATALOG_TYPE, false, 1);
 		properties.validateInt(CATALOG_PROPERTY_VERSION, true, 0);
+		properties.validateString(CATALOG_DEFAULT_DATABASE, true, 1);
 	}
 }