You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/16 01:58:45 UTC
[flink] branch master updated: [FLINK-13176][SQL CLI] remember
current catalog and database in SQL CLI SessionContext
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a740f0c [FLINK-13176][SQL CLI] remember current catalog and database in SQL CLI SessionContext
a740f0c is described below
commit a740f0c2799ba48e19acb185c1138115fd63aa65
Author: bowen.li <bo...@gmail.com>
AuthorDate: Wed Jul 10 16:44:30 2019 -0700
[FLINK-13176][SQL CLI] remember current catalog and database in SQL CLI SessionContext
This PR supports remembering current catalog and database that users set in SQL CLI SessionContext.
This closes #9049.
---
.../client/config/entries/ExecutionEntry.java | 4 +-
.../flink/table/client/gateway/SessionContext.java | 25 ++++++++++++
.../client/gateway/local/ExecutionContext.java | 17 ++++----
.../table/client/gateway/local/LocalExecutor.java | 19 +++++++--
.../table/client/gateway/local/DependencyTest.java | 22 ++++++++++-
.../client/gateway/local/LocalExecutorITCase.java | 46 ++++++++++++++++++++++
6 files changed, 119 insertions(+), 14 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
index a1d47a0..80d3efb 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
@@ -97,9 +97,9 @@ public class ExecutionEntry extends ConfigEntry {
private static final String EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL = "restart-strategy.max-failures-per-interval";
- private static final String EXECUTION_CURRNET_CATALOG = "current-catalog";
+ public static final String EXECUTION_CURRNET_CATALOG = "current-catalog";
- private static final String EXECUTION_CURRNET_DATABASE = "current-database";
+ public static final String EXECUTION_CURRNET_DATABASE = "current-database";
private ExecutionEntry(DescriptorProperties properties) {
super(properties);
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
index af17941..d2a7da2 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
@@ -19,13 +19,18 @@
package org.apache.flink.table.client.gateway;
import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.entries.ExecutionEntry;
import org.apache.flink.table.client.config.entries.ViewEntry;
+import org.apache.flink.util.StringUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Context describing a session.
@@ -73,6 +78,26 @@ public class SessionContext {
return name;
}
+ public Optional<String> getCurrentCatalog() {
+ return Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRNET_CATALOG));
+ }
+
+ public void setCurrentCatalog(String currentCatalog) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(currentCatalog));
+
+ sessionProperties.put(ExecutionEntry.EXECUTION_CURRNET_CATALOG, currentCatalog);
+ }
+
+ public Optional<String> getCurrentDatabase() {
+ return Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRNET_DATABASE));
+ }
+
+ public void setCurrentDatabase(String currentDatabase) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(currentDatabase));
+
+ sessionProperties.put(ExecutionEntry.EXECUTION_CURRNET_DATABASE, currentDatabase);
+ }
+
public Environment getEnvironment() {
return Environment.enrich(
defaultEnvironment,
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 344505f..0df7fba 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -78,7 +78,6 @@ import java.net.URL;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.function.Supplier;
/**
@@ -303,14 +302,18 @@ public class ExecutionContext<T> {
// register catalogs
catalogs.forEach(tableEnv::registerCatalog);
- Optional<String> potentialCurrentCatalog = mergedEnv.getExecution().getCurrentCatalog();
- if (potentialCurrentCatalog.isPresent()) {
- tableEnv.useCatalog(potentialCurrentCatalog.get());
+ // set current catalog
+ if (sessionContext.getCurrentCatalog().isPresent()) {
+ tableEnv.useCatalog(sessionContext.getCurrentCatalog().get());
+ } else if (mergedEnv.getExecution().getCurrentCatalog().isPresent()) {
+ tableEnv.useCatalog(mergedEnv.getExecution().getCurrentCatalog().get());
}
- Optional<String> potentialCurrentDatabase = mergedEnv.getExecution().getCurrentDatabase();
- if (potentialCurrentDatabase.isPresent()) {
- tableEnv.useDatabase(potentialCurrentDatabase.get());
+ // set current database
+ if (sessionContext.getCurrentDatabase().isPresent()) {
+ tableEnv.useDatabase(sessionContext.getCurrentDatabase().get());
+ } else if (mergedEnv.getExecution().getCurrentDatabase().isPresent()) {
+ tableEnv.useDatabase(mergedEnv.getExecution().getCurrentDatabase().get());
}
// create query config
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index af128ef..7e41f1f 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -192,18 +192,24 @@ public class LocalExecutor implements Executor {
@Override
public List<String> listCatalogs(SessionContext session) throws SqlExecutionException {
- final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
+ final ExecutionContext<?> context = getOrCreateExecutionContext(session);
+
+ final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
- return Arrays.asList(tableEnv.listCatalogs());
+
+ return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listCatalogs()));
}
@Override
public List<String> listDatabases(SessionContext session) throws SqlExecutionException {
- final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
+ final ExecutionContext<?> context = getOrCreateExecutionContext(session);
+
+ final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
- return Arrays.asList(tableEnv.listDatabases());
+
+ return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listDatabases()));
}
@Override
@@ -232,7 +238,10 @@ public class LocalExecutor implements Executor {
.getTableEnvironment();
context.wrapClassLoader(() -> {
+ // Rely on TableEnvironment/CatalogManager to validate input
tableEnv.useCatalog(catalogName);
+ session.setCurrentCatalog(catalogName);
+ session.setCurrentDatabase(tableEnv.getCurrentDatabase());
return null;
});
}
@@ -245,7 +254,9 @@ public class LocalExecutor implements Executor {
.getTableEnvironment();
context.wrapClassLoader(() -> {
+ // Rely on TableEnvironment/CatalogManager to validate input
tableEnv.useDatabase(databaseName);
+ session.setCurrentDatabase(databaseName);
return null;
});
}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 286d76a..5fabe83 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -20,13 +20,19 @@ package org.apache.flink.table.client.gateway.local;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
@@ -165,6 +171,7 @@ public class DependencyTest {
*/
public static class TestHiveCatalogFactory extends HiveCatalogFactory {
public static final String ADDITIONAL_TEST_DATABASE = "additional_test_database";
+ public static final String TEST_TABLE = "test_table";
@Override
public Map<String, String> requiredContext() {
@@ -193,7 +200,20 @@ public class DependencyTest {
ADDITIONAL_TEST_DATABASE,
new CatalogDatabaseImpl(new HashMap<>(), null),
false);
- } catch (DatabaseAlreadyExistException e) {
+ hiveCatalog.createTable(
+ new ObjectPath(ADDITIONAL_TEST_DATABASE, TEST_TABLE),
+ new CatalogTableImpl(
+ TableSchema.builder()
+ .field("testcol", DataTypes.INT())
+ .build(),
+ new HashMap<String, String>() {{
+ put(CatalogConfig.IS_GENERIC, String.valueOf(true));
+ }},
+ ""
+ ),
+ false
+ );
+ } catch (DatabaseAlreadyExistException | TableAlreadyExistException | DatabaseNotExistException e) {
throw new CatalogException(e);
}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 023d656..ac1a7ae 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.ViewEntry;
import org.apache.flink.table.client.gateway.Executor;
@@ -75,6 +76,8 @@ import static org.junit.Assert.fail;
public class LocalExecutorITCase extends TestLogger {
private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
+ private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-catalogs.yaml";
+
private static final int NUM_TMS = 2;
private static final int NUM_SLOTS_PER_TM = 2;
@@ -426,6 +429,40 @@ public class LocalExecutorITCase extends TestLogger {
}
}
+ @Test
+ public void testUseCatalogAndUseDatabase() throws Exception {
+ final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString();
+ final URL url = getClass().getClassLoader().getResource("test-data.csv");
+ Objects.requireNonNull(url);
+ final Map<String, String> replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+ replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+ replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
+ replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+ replaceVars.put("$VAR_MAX_ROWS", "100");
+
+ final Executor executor = createModifiedExecutor(CATALOGS_ENVIRONMENT_FILE, clusterClient, replaceVars);
+ final SessionContext session = new SessionContext("test-session", new Environment());
+
+ try {
+ assertEquals(Arrays.asList("mydatabase"), executor.listDatabases(session));
+
+ executor.useCatalog(session, "hivecatalog");
+
+ assertEquals(
+ Arrays.asList(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, HiveCatalog.DEFAULT_DB),
+ executor.listDatabases(session));
+
+ assertEquals(Collections.emptyList(), executor.listTables(session));
+
+ executor.useDatabase(session, DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE);
+
+ assertEquals(Arrays.asList(DependencyTest.TestHiveCatalogFactory.TEST_TABLE), executor.listTables(session));
+ } finally {
+ executor.stop(session);
+ }
+ }
+
private void executeStreamQueryTable(
Map<String, String> replaceVars,
String query,
@@ -481,6 +518,15 @@ public class LocalExecutorITCase extends TestLogger {
new DummyCustomCommandLine<T>(clusterClient));
}
+ private <T> LocalExecutor createModifiedExecutor(
+ String yamlFile, ClusterClient<T> clusterClient, Map<String, String> replaceVars) throws Exception {
+ return new LocalExecutor(
+ EnvironmentFileUtil.parseModified(yamlFile, replaceVars),
+ Collections.emptyList(),
+ clusterClient.getFlinkConfiguration(),
+ new DummyCustomCommandLine<T>(clusterClient));
+ }
+
private List<String> retrieveTableResult(
Executor executor,
SessionContext session,