You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/31 17:50:12 UTC
[flink] branch master updated: [FLINK-12679][sql-client] Support
'default-database' config for catalog entries in SQL CLI yaml file
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8b7577c [FLINK-12679][sql-client] Support 'default-database' config for catalog entries in SQL CLI yaml file
8b7577c is described below
commit 8b7577cfafa067ca719aac64695c2b5acc8d56f2
Author: bowen.li <bo...@gmail.com>
AuthorDate: Tue May 28 16:09:56 2019 -0700
[FLINK-12679][sql-client] Support 'default-database' config for catalog entries in SQL CLI yaml file
---
.../table/client/gateway/local/DependencyTest.java | 20 ++++++++++++++----
.../client/gateway/local/ExecutionContextTest.java | 4 ++--
.../test/resources/test-sql-client-catalogs.yaml | 7 +++++--
.../flink/table/descriptors/CatalogDescriptor.java | 24 ++++++++++++++++++++++
.../descriptors/CatalogDescriptorValidator.java | 6 ++++++
5 files changed, 53 insertions(+), 8 deletions(-)
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 109246c..7730b0d 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -41,7 +41,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
import static org.junit.Assert.assertEquals;
@@ -125,6 +127,7 @@ public class DependencyTest {
public List<String> supportedProperties() {
final List<String> properties = new ArrayList<>();
properties.add(TEST_PROPERTY);
+ properties.add(CATALOG_DEFAULT_DATABASE);
return properties;
}
@@ -132,7 +135,14 @@ public class DependencyTest {
public Catalog createCatalog(String name, Map<String, String> properties) {
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);
- return new TestCatalog(name);
+
+ final Optional<String> defaultDatabase = params.getOptionalString(CATALOG_DEFAULT_DATABASE);
+
+ if (defaultDatabase.isPresent()) {
+ return new TestCatalog(name, defaultDatabase.get());
+ } else {
+ return new TestCatalog(name);
+ }
}
}
@@ -141,10 +151,12 @@ public class DependencyTest {
*/
public static class TestCatalog extends GenericInMemoryCatalog {
- private static final String TEST_DATABASE_NAME = "mydatabase";
-
public TestCatalog(String name) {
- super(name, TEST_DATABASE_NAME);
+ super(name);
+ }
+
+ public TestCatalog(String name, String defaultDatabase) {
+ super(name, defaultDatabase);
}
}
}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index 21cf7d0..3783f37 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -73,12 +73,12 @@ public class ExecutionContextTest {
@Test
public void testCatalogs() throws Exception {
- final String catalogName = "catalog1";
+ final String catalogName = "catalog2";
final ExecutionContext<?> context = createCatalogExecutionContext();
final TableEnvironment tableEnv = context.createEnvironmentInstance().getTableEnvironment();
assertEquals(tableEnv.getCurrentCatalog(), catalogName);
- assertEquals(tableEnv.getCurrentDatabase(), "mydatabase");
+ assertEquals(tableEnv.getCurrentDatabase(), "test-default-database");
}
@Test
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
index e915930..324ae38 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
@@ -112,8 +112,8 @@ execution:
max-failures-per-interval: 10
failure-rate-interval: 99000
delay: 1000
- current-catalog: catalog1
- current-database: mydatabase
+ current-catalog: catalog2
+ current-database: test-default-database
deployment:
response-timeout: 5000
@@ -121,3 +121,6 @@ deployment:
catalogs:
- name: catalog1
type: DependencyTest
+ - name: catalog2
+ type: DependencyTest
+ default-database: test-default-database
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java
index 18b433e..753e63c 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
import java.util.Map;
+import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
@@ -35,6 +36,8 @@ public abstract class CatalogDescriptor extends DescriptorBase {
private final int propertyVersion;
+ private final String defaultDatabase;
+
/**
* Constructs a {@link CatalogDescriptor}.
*
@@ -42,8 +45,20 @@ public abstract class CatalogDescriptor extends DescriptorBase {
* @param propertyVersion property version for backwards compatibility
*/
public CatalogDescriptor(String type, int propertyVersion) {
+ this(type, propertyVersion, null);
+ }
+
+ /**
+ * Constructs a {@link CatalogDescriptor}.
+ *
+ * @param type string that identifies this catalog
+ * @param propertyVersion property version for backwards compatibility
+ * @param defaultDatabase default database of the catalog
+ */
+ public CatalogDescriptor(String type, int propertyVersion, String defaultDatabase) {
this.type = type;
this.propertyVersion = propertyVersion;
+ this.defaultDatabase = defaultDatabase;
}
@Override
@@ -51,10 +66,19 @@ public abstract class CatalogDescriptor extends DescriptorBase {
final DescriptorProperties properties = new DescriptorProperties();
properties.putString(CATALOG_TYPE, type);
properties.putLong(CATALOG_PROPERTY_VERSION, propertyVersion);
+
+ if (defaultDatabase != null) {
+ properties.putString(CATALOG_DEFAULT_DATABASE, defaultDatabase);
+ }
+
properties.putProperties(toCatalogProperties());
return properties.asMap();
}
+ public String getDefaultDatabase() {
+ return defaultDatabase;
+ }
+
/**
* Converts this descriptor into a set of catalog properties.
*/
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java
index 723dcb0..a907ac7 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptorValidator.java
@@ -37,9 +37,15 @@ public abstract class CatalogDescriptorValidator implements DescriptorValidator
*/
public static final String CATALOG_PROPERTY_VERSION = "property-version";
+ /**
+ * Key for describing the default database of the catalog.
+ */
+ public static final String CATALOG_DEFAULT_DATABASE = "default-database";
+
@Override
public void validate(DescriptorProperties properties) {
properties.validateString(CATALOG_TYPE, false, 1);
properties.validateInt(CATALOG_PROPERTY_VERSION, true, 0);
+ properties.validateString(CATALOG_DEFAULT_DATABASE, true, 1);
}
}