You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/16 01:58:45 UTC

[flink] branch master updated: [FLINK-13176][SQL CLI] remember current catalog and database in SQL CLI SessionContext

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a740f0c  [FLINK-13176][SQL CLI] remember current catalog and database in SQL CLI SessionContext
a740f0c is described below

commit a740f0c2799ba48e19acb185c1138115fd63aa65
Author: bowen.li <bo...@gmail.com>
AuthorDate: Wed Jul 10 16:44:30 2019 -0700

    [FLINK-13176][SQL CLI] remember current catalog and database in SQL CLI SessionContext
    
    This PR supports remembering current catalog and database that users set in SQL CLI SessionContext.
    
    This closes #9049.
---
 .../client/config/entries/ExecutionEntry.java      |  4 +-
 .../flink/table/client/gateway/SessionContext.java | 25 ++++++++++++
 .../client/gateway/local/ExecutionContext.java     | 17 ++++----
 .../table/client/gateway/local/LocalExecutor.java  | 19 +++++++--
 .../table/client/gateway/local/DependencyTest.java | 22 ++++++++++-
 .../client/gateway/local/LocalExecutorITCase.java  | 46 ++++++++++++++++++++++
 6 files changed, 119 insertions(+), 14 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
index a1d47a0..80d3efb 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
@@ -97,9 +97,9 @@ public class ExecutionEntry extends ConfigEntry {
 
 	private static final String EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL = "restart-strategy.max-failures-per-interval";
 
-	private static final String EXECUTION_CURRNET_CATALOG = "current-catalog";
+	public static final String EXECUTION_CURRNET_CATALOG = "current-catalog";
 
-	private static final String EXECUTION_CURRNET_DATABASE = "current-database";
+	public static final String EXECUTION_CURRNET_DATABASE = "current-database";
 
 	private ExecutionEntry(DescriptorProperties properties) {
 		super(properties);
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
index af17941..d2a7da2 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
@@ -19,13 +19,18 @@
 package org.apache.flink.table.client.gateway;
 
 import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.entries.ExecutionEntry;
 import org.apache.flink.table.client.config.entries.ViewEntry;
+import org.apache.flink.util.StringUtils;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Context describing a session.
@@ -73,6 +78,26 @@ public class SessionContext {
 		return name;
 	}
 
+	public Optional<String> getCurrentCatalog() {
+		return Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRNET_CATALOG));
+	}
+
+	public void setCurrentCatalog(String currentCatalog) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(currentCatalog));
+
+		sessionProperties.put(ExecutionEntry.EXECUTION_CURRNET_CATALOG, currentCatalog);
+	}
+
+	public Optional<String> getCurrentDatabase() {
+		return Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRNET_DATABASE));
+	}
+
+	public void setCurrentDatabase(String currentDatabase) {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(currentDatabase));
+
+		sessionProperties.put(ExecutionEntry.EXECUTION_CURRNET_DATABASE, currentDatabase);
+	}
+
 	public Environment getEnvironment() {
 		return Environment.enrich(
 			defaultEnvironment,
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 344505f..0df7fba 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -78,7 +78,6 @@ import java.net.URL;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.function.Supplier;
 
 /**
@@ -303,14 +302,18 @@ public class ExecutionContext<T> {
 			// register catalogs
 			catalogs.forEach(tableEnv::registerCatalog);
 
-			Optional<String> potentialCurrentCatalog = mergedEnv.getExecution().getCurrentCatalog();
-			if (potentialCurrentCatalog.isPresent()) {
-				tableEnv.useCatalog(potentialCurrentCatalog.get());
+			// set current catalog
+			if (sessionContext.getCurrentCatalog().isPresent()) {
+				tableEnv.useCatalog(sessionContext.getCurrentCatalog().get());
+			} else if (mergedEnv.getExecution().getCurrentCatalog().isPresent()) {
+				tableEnv.useCatalog(mergedEnv.getExecution().getCurrentCatalog().get());
 			}
 
-			Optional<String> potentialCurrentDatabase = mergedEnv.getExecution().getCurrentDatabase();
-			if (potentialCurrentDatabase.isPresent()) {
-				tableEnv.useDatabase(potentialCurrentDatabase.get());
+			// set current database
+			if (sessionContext.getCurrentDatabase().isPresent()) {
+				tableEnv.useDatabase(sessionContext.getCurrentDatabase().get());
+			} else if (mergedEnv.getExecution().getCurrentDatabase().isPresent()) {
+				tableEnv.useDatabase(mergedEnv.getExecution().getCurrentDatabase().get());
 			}
 
 			// create query config
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index af128ef..7e41f1f 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -192,18 +192,24 @@ public class LocalExecutor implements Executor {
 
 	@Override
 	public List<String> listCatalogs(SessionContext session) throws SqlExecutionException {
-		final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
+		final ExecutionContext<?> context = getOrCreateExecutionContext(session);
+
+		final TableEnvironment tableEnv = context
 			.createEnvironmentInstance()
 			.getTableEnvironment();
-		return Arrays.asList(tableEnv.listCatalogs());
+
+		return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listCatalogs()));
 	}
 
 	@Override
 	public List<String> listDatabases(SessionContext session) throws SqlExecutionException {
-		final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
+		final ExecutionContext<?> context = getOrCreateExecutionContext(session);
+
+		final TableEnvironment tableEnv = context
 			.createEnvironmentInstance()
 			.getTableEnvironment();
-		return Arrays.asList(tableEnv.listDatabases());
+
+		return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listDatabases()));
 	}
 
 	@Override
@@ -232,7 +238,10 @@ public class LocalExecutor implements Executor {
 			.getTableEnvironment();
 
 		context.wrapClassLoader(() -> {
+			// Rely on TableEnvironment/CatalogManager to validate input
 			tableEnv.useCatalog(catalogName);
+			session.setCurrentCatalog(catalogName);
+			session.setCurrentDatabase(tableEnv.getCurrentDatabase());
 			return null;
 		});
 	}
@@ -245,7 +254,9 @@ public class LocalExecutor implements Executor {
 			.getTableEnvironment();
 
 		context.wrapClassLoader(() -> {
+			// Rely on TableEnvironment/CatalogManager to validate input
 			tableEnv.useDatabase(databaseName);
+			session.setCurrentDatabase(databaseName);
 			return null;
 		});
 	}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 286d76a..5fabe83 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -20,13 +20,19 @@ package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
@@ -165,6 +171,7 @@ public class DependencyTest {
 	 */
 	public static class TestHiveCatalogFactory extends HiveCatalogFactory {
 		public static final String ADDITIONAL_TEST_DATABASE = "additional_test_database";
+		public static final String TEST_TABLE = "test_table";
 
 		@Override
 		public Map<String, String> requiredContext() {
@@ -193,7 +200,20 @@ public class DependencyTest {
 					ADDITIONAL_TEST_DATABASE,
 					new CatalogDatabaseImpl(new HashMap<>(), null),
 					false);
-			} catch (DatabaseAlreadyExistException e) {
+				hiveCatalog.createTable(
+					new ObjectPath(ADDITIONAL_TEST_DATABASE, TEST_TABLE),
+					new CatalogTableImpl(
+						TableSchema.builder()
+							.field("testcol", DataTypes.INT())
+							.build(),
+						new HashMap<String, String>() {{
+							put(CatalogConfig.IS_GENERIC, String.valueOf(true));
+						}},
+						""
+					),
+					false
+				);
+			} catch (DatabaseAlreadyExistException | TableAlreadyExistException | DatabaseNotExistException e) {
 				throw new CatalogException(e);
 			}
 
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 023d656..ac1a7ae 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.Executor;
@@ -75,6 +76,8 @@ import static org.junit.Assert.fail;
 public class LocalExecutorITCase extends TestLogger {
 
 	private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
+	private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-catalogs.yaml";
+
 	private static final int NUM_TMS = 2;
 	private static final int NUM_SLOTS_PER_TM = 2;
 
@@ -426,6 +429,40 @@ public class LocalExecutorITCase extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testUseCatalogAndUseDatabase() throws Exception {
+		final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString();
+		final URL url = getClass().getClassLoader().getResource("test-data.csv");
+		Objects.requireNonNull(url);
+		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+		replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+		replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
+		replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+		replaceVars.put("$VAR_MAX_ROWS", "100");
+
+		final Executor executor = createModifiedExecutor(CATALOGS_ENVIRONMENT_FILE, clusterClient, replaceVars);
+		final SessionContext session = new SessionContext("test-session", new Environment());
+
+		try {
+			assertEquals(Arrays.asList("mydatabase"), executor.listDatabases(session));
+
+			executor.useCatalog(session, "hivecatalog");
+
+			assertEquals(
+				Arrays.asList(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, HiveCatalog.DEFAULT_DB),
+				executor.listDatabases(session));
+
+			assertEquals(Collections.emptyList(), executor.listTables(session));
+
+			executor.useDatabase(session, DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE);
+
+			assertEquals(Arrays.asList(DependencyTest.TestHiveCatalogFactory.TEST_TABLE), executor.listTables(session));
+		} finally {
+			executor.stop(session);
+		}
+	}
+
 	private void executeStreamQueryTable(
 			Map<String, String> replaceVars,
 			String query,
@@ -481,6 +518,15 @@ public class LocalExecutorITCase extends TestLogger {
 			new DummyCustomCommandLine<T>(clusterClient));
 	}
 
+	private <T> LocalExecutor createModifiedExecutor(
+			String yamlFile, ClusterClient<T> clusterClient, Map<String, String> replaceVars) throws Exception {
+		return new LocalExecutor(
+			EnvironmentFileUtil.parseModified(yamlFile, replaceVars),
+			Collections.emptyList(),
+			clusterClient.getFlinkConfiguration(),
+			new DummyCustomCommandLine<T>(clusterClient));
+	}
+
 	private List<String> retrieveTableResult(
 			Executor executor,
 			SessionContext session,