You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/19 07:04:31 UTC
[flink] branch release-1.11 updated: [FLINK-17786][sql-client] Fix
can not switch dialect in SQL CLI
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 90982ca [FLINK-17786][sql-client] Fix can not switch dialect in SQL CLI
90982ca is described below
commit 90982ca1a9595bdadfb3433fbfdbabe6097e254d
Author: Rui Li <li...@apache.org>
AuthorDate: Tue May 19 15:01:41 2020 +0800
[FLINK-17786][sql-client] Fix can not switch dialect in SQL CLI
Remove dialect from ExecutionEntry
This closes #12217
---
.../client/config/entries/ExecutionEntry.java | 15 -----------
.../client/gateway/local/ExecutionContext.java | 1 -
.../client/gateway/local/ExecutionContextTest.java | 17 -------------
.../client/gateway/local/LocalExecutorITCase.java | 29 ++++++++++++++++++++++
.../test/resources/test-sql-client-dialect.yaml | 6 +----
5 files changed, 30 insertions(+), 38 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
index dd7c322..78be7f5 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.client.config.ConfigUtil;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -111,8 +110,6 @@ public class ExecutionEntry extends ConfigEntry {
public static final String EXECUTION_CURRENT_DATABASE = "current-database";
- public static final String EXECUTION_SQL_DIALECT = "dialect";
-
private ExecutionEntry(DescriptorProperties properties) {
super(properties);
}
@@ -157,12 +154,6 @@ public class ExecutionEntry extends ConfigEntry {
properties.validateInt(EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL, true, 1);
properties.validateString(EXECUTION_CURRENT_CATALOG, true, 1);
properties.validateString(EXECUTION_CURRENT_DATABASE, true, 1);
- properties.validateEnumValues(EXECUTION_SQL_DIALECT,
- true,
- Arrays.asList(
- SqlDialect.DEFAULT.name().toLowerCase(),
- SqlDialect.HIVE.name().toLowerCase()
- ));
}
public EnvironmentSettings getEnvironmentSettings() {
@@ -339,12 +330,6 @@ public class ExecutionEntry extends ConfigEntry {
.orElse(false);
}
- public SqlDialect getSqlDialect() {
- return properties.getOptionalString(EXECUTION_SQL_DIALECT)
- .map(name -> SqlDialect.valueOf(name.toUpperCase()))
- .orElse(SqlDialect.DEFAULT);
- }
-
public Map<String, String> asTopLevelMap() {
return properties.asPrefixedMap(EXECUTION_ENTRY + '.');
}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 6a5b996..4396ba9 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -456,7 +456,6 @@ public class ExecutionContext<ClusterID> {
config.addConfiguration(flinkConfig);
environment.getConfiguration().asMap().forEach((k, v) ->
config.getConfiguration().setString(k, v));
- config.setSqlDialect(environment.getExecution().getSqlDialect());
if (noInheritedState) {
//--------------------------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index b68ebef..f44513e 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.python.PythonFunctionFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -74,7 +73,6 @@ public class ExecutionContextTest {
public static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-catalogs.yaml";
private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-client-streaming.yaml";
private static final String CONFIGURATION_ENVIRONMENT_FILE = "test-sql-client-configuration.yaml";
- private static final String DIALECT_ENVIRONMENT_FILE = "test-sql-client-dialect.yaml";
private static final String FUNCTION_ENVIRONMENT_FILE = "test-sql-client-python-functions.yaml";
@Test
@@ -307,21 +305,6 @@ public class ExecutionContextTest {
Collections.singletonList(new DefaultCLI(flinkConfig))).build();
}
- @Test
- public void testSQLDialect() throws Exception {
- ExecutionContext<?> context = createDefaultExecutionContext();
- assertEquals(SqlDialect.DEFAULT, context.getTableEnvironment().getConfig().getSqlDialect());
-
- Map<String, String> replaceVars = new HashMap<>();
- replaceVars.put("$VAR_DIALECT", "default");
- context = createExecutionContext(DIALECT_ENVIRONMENT_FILE, replaceVars);
- assertEquals(SqlDialect.DEFAULT, context.getTableEnvironment().getConfig().getSqlDialect());
-
- replaceVars.put("$VAR_DIALECT", "hive");
- context = createExecutionContext(DIALECT_ENVIRONMENT_FILE, replaceVars);
- assertEquals(SqlDialect.HIVE, context.getTableEnvironment().getConfig().getSqlDialect());
- }
-
@SuppressWarnings("unchecked")
private <T> ExecutionContext<T> createExecutionContext(String file, Map<String, String> replaceVars) throws Exception {
final Environment env = EnvironmentFileUtil.parseModified(
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index cb96387..f7b56d4 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -32,8 +32,10 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.ExecutionEntry;
@@ -107,6 +109,7 @@ public class LocalExecutorITCase extends TestLogger {
}
private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
+ private static final String DIALECT_ENVIRONMENT_FILE = "test-sql-client-dialect.yaml";
private static final int NUM_TMS = 2;
private static final int NUM_SLOTS_PER_TM = 2;
@@ -1304,6 +1307,32 @@ public class LocalExecutorITCase extends TestLogger {
}
}
+ @Test
+ public void testSQLDialect() throws Exception {
+ LocalExecutor executor = createDefaultExecutor(clusterClient);
+ final SessionContext session = new SessionContext("test-session", new Environment());
+ String sessionId = executor.openSession(session);
+ // by default to use DEFAULT dialect
+ assertEquals(SqlDialect.DEFAULT, executor.getExecutionContext(sessionId).getTableEnvironment().getConfig().getSqlDialect());
+ // test switching dialect
+ executor.setSessionProperty(sessionId, TableConfigOptions.TABLE_SQL_DIALECT.key(), "hive");
+ assertEquals(SqlDialect.HIVE, executor.getExecutionContext(sessionId).getTableEnvironment().getConfig().getSqlDialect());
+ executor.closeSession(sessionId);
+
+ Map<String, String> replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_DIALECT", "default");
+ executor = createModifiedExecutor(DIALECT_ENVIRONMENT_FILE, clusterClient, replaceVars);
+ sessionId = executor.openSession(session);
+ assertEquals(SqlDialect.DEFAULT, executor.getExecutionContext(sessionId).getTableEnvironment().getConfig().getSqlDialect());
+ executor.closeSession(sessionId);
+
+ replaceVars.put("$VAR_DIALECT", "hive");
+ executor = createModifiedExecutor(DIALECT_ENVIRONMENT_FILE, clusterClient, replaceVars);
+ sessionId = executor.openSession(session);
+ assertEquals(SqlDialect.HIVE, executor.getExecutionContext(sessionId).getTableEnvironment().getConfig().getSqlDialect());
+ executor.closeSession(sessionId);
+ }
+
private void executeStreamQueryTable(
Map<String, String> replaceVars,
String query,
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-dialect.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-dialect.yaml
index d4fa4bf..a60be03 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-dialect.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-dialect.yaml
@@ -25,10 +25,6 @@ execution:
planner: blink
type: batch
result-mode: table
- dialect: "$VAR_DIALECT"
configuration:
- table.exec.sort.default-limit: 100
- table.exec.spill-compression.enabled: true
- table.exec.spill-compression.block-size: 128kb
- table.optimizer.join-reorder-enabled: true
+ table.sql-dialect: "$VAR_DIALECT"