You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/05/25 02:04:39 UTC
[flink] branch release-1.17 updated: [FLINK-32043][sql-gateway] Fix Sql Gateway doesn't validate set state… (#22633)
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 23030f6546a [FLINK-32043][sql-gateway] Fix Sql Gateway doesn't validate set state… (#22633)
23030f6546a is described below
commit 23030f6546a5f5877166ee1dc6f49dd18f4dc188
Author: Shengkai <33...@users.noreply.github.com>
AuthorDate: Thu May 25 10:04:30 2023 +0800
[FLINK-32043][sql-gateway] Fix Sql Gateway doesn't validate set state… (#22633)
* [FLINK-32043][sql-gateway] Fix Sql Gateway doesn't validate set statement
---
.../gateway/service/context/SessionContext.java | 3 +-
.../service/operation/OperationExecutor.java | 1 -
.../gateway/AbstractSqlGatewayStatementITCase.java | 3 +
.../service/context/SessionContextTest.java | 6 --
.../flink-sql-gateway/src/test/resources/sql/set.q | 68 ++++++++++++++++++++++
5 files changed, 73 insertions(+), 8 deletions(-)
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index d9dddf20a36..81055fc05b8 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -135,7 +135,8 @@ public class SessionContext {
public void set(String key, String value) {
try {
// Test whether the key value will influence the creation of the Executor.
- createOperationExecutor(Configuration.fromMap(Collections.singletonMap(key, value)));
+ createOperationExecutor(Configuration.fromMap(Collections.singletonMap(key, value)))
+ .getTableEnvironment();
} catch (Exception e) {
// get error and reset the key with old value
throw new SqlExecutionException(
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 6ba1b544500..22c4ffcdf32 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -301,7 +301,6 @@ public class OperationExecutor {
// --------------------------------------------------------------------------------------------
- @VisibleForTesting
public TableEnvironmentInternal getTableEnvironment() {
// checks the value of RUNTIME_MODE
Configuration operationConfig = sessionContext.getSessionConf().clone();
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
index 890e55acc02..0ba90189172 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
@@ -65,6 +65,7 @@ import java.util.jar.JarFile;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import static org.apache.flink.configuration.RestOptions.PORT;
import static org.apache.flink.table.gateway.utils.SqlScriptReader.HINT_START_OF_OUTPUT;
import static org.apache.flink.table.planner.utils.TableTestUtil.replaceNodeIdInOperator;
import static org.apache.flink.table.planner.utils.TableTestUtil.replaceStreamNodeId;
@@ -121,6 +122,8 @@ public abstract class AbstractSqlGatewayStatementITCase extends AbstractTestBase
replaceVars.put(
"$VAR_BATCH_CTAS_PATH",
Files.createDirectory(temporaryFolder.resolve("batch_ctas")).toFile().getPath());
+ replaceVars.put(
+ "$VAR_REST_PORT", MINI_CLUSTER.getClientConfiguration().get(PORT).toString());
}
@TestTemplate
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
index bc6976b887c..8115379b7c0 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
@@ -64,15 +64,12 @@ class SessionContextTest {
@Test
public void testSetAndResetOption() {
- // table config option
- sessionContext.set(TABLE_SQL_DIALECT.key(), "hive");
// runtime config option
sessionContext.set(MAX_PARALLELISM.key(), "128");
// runtime config option and doesn't have default value
sessionContext.set(NAME.key(), "test");
// runtime config from flink-conf
sessionContext.set(OBJECT_REUSE.key(), "false");
- assertThat(sessionContext.getSessionConf().get(TABLE_SQL_DIALECT)).isEqualTo("hive");
assertThat(sessionContext.getSessionConf().get(MAX_PARALLELISM)).isEqualTo(128);
assertThat(sessionContext.getSessionConf().get(NAME)).isEqualTo("test");
assertThat(sessionContext.getSessionConf().get(OBJECT_REUSE)).isFalse();
@@ -89,8 +86,6 @@ class SessionContextTest {
@Test
public void testSetAndResetKeyInConfigOptions() {
- // table config option
- sessionContext.set(TABLE_SQL_DIALECT.key(), "hive");
// runtime config option
sessionContext.set(MAX_PARALLELISM.key(), "128");
// runtime config option and doesn't have default value
@@ -98,7 +93,6 @@ class SessionContextTest {
// runtime config from flink-conf
sessionContext.set(OBJECT_REUSE.key(), "false");
- assertThat(sessionContext.getSessionConf().get(TABLE_SQL_DIALECT)).isEqualTo("hive");
assertThat(sessionContext.getSessionConf().get(MAX_PARALLELISM)).isEqualTo(128);
assertThat(sessionContext.getSessionConf().get(NAME)).isEqualTo("test");
assertThat(sessionContext.getSessionConf().get(OBJECT_REUSE)).isFalse();
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/set.q b/flink-table/flink-sql-gateway/src/test/resources/sql/set.q
new file mode 100644
index 00000000000..c2e772c230a
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/set.q
@@ -0,0 +1,68 @@
+# set.q - SET, RESET
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+reset table.resources.download-dir;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
+set;
+!output
++--------------------------------------------+-----------+
+| key | value |
++--------------------------------------------+-----------+
+| execution.attached | true |
+| execution.savepoint-restore-mode | NO_CLAIM |
+| execution.savepoint.ignore-unclaimed-state | false |
+| execution.shutdown-on-attached-exit | false |
+| execution.target | remote |
+| jobmanager.rpc.address | localhost |
+| pipeline.classpaths | |
+| pipeline.jars | |
+| rest.port | $VAR_REST_PORT |
++--------------------------------------------+-----------+
+9 rows in set
+!ok
+
+# set illegal value
+set 'table.sql-dialect' = 'unknown';
+!output
+java.lang.IllegalArgumentException: No enum constant org.apache.flink.table.api.SqlDialect.UNKNOWN
+!error
+
+set;
+!output
++--------------------------------------------+-----------+
+| key | value |
++--------------------------------------------+-----------+
+| execution.attached | true |
+| execution.savepoint-restore-mode | NO_CLAIM |
+| execution.savepoint.ignore-unclaimed-state | false |
+| execution.shutdown-on-attached-exit | false |
+| execution.target | remote |
+| jobmanager.rpc.address | localhost |
+| pipeline.classpaths | |
+| pipeline.jars | |
+| rest.port | $VAR_REST_PORT |
++--------------------------------------------+-----------+
+9 rows in set
+!ok