You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/05/25 02:04:39 UTC

[flink] branch release-1.17 updated: [FLINK-32043][sql-gateway] Fix Sql Gateway doesn't validate set state… (#22633)

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 23030f6546a [FLINK-32043][sql-gateway] Fix Sql Gateway doesn't validate set state… (#22633)
23030f6546a is described below

commit 23030f6546a5f5877166ee1dc6f49dd18f4dc188
Author: Shengkai <33...@users.noreply.github.com>
AuthorDate: Thu May 25 10:04:30 2023 +0800

    [FLINK-32043][sql-gateway] Fix Sql Gateway doesn't validate set state… (#22633)
    
    * [FLINK-32043][sql-gateway] Fix Sql Gateway doesn't validate set statement
---
 .../gateway/service/context/SessionContext.java    |  3 +-
 .../service/operation/OperationExecutor.java       |  1 -
 .../gateway/AbstractSqlGatewayStatementITCase.java |  3 +
 .../service/context/SessionContextTest.java        |  6 --
 .../flink-sql-gateway/src/test/resources/sql/set.q | 68 ++++++++++++++++++++++
 5 files changed, 73 insertions(+), 8 deletions(-)

diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index d9dddf20a36..81055fc05b8 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -135,7 +135,8 @@ public class SessionContext {
     public void set(String key, String value) {
         try {
             // Test whether the key value will influence the creation of the Executor.
-            createOperationExecutor(Configuration.fromMap(Collections.singletonMap(key, value)));
+            createOperationExecutor(Configuration.fromMap(Collections.singletonMap(key, value)))
+                    .getTableEnvironment();
         } catch (Exception e) {
             // get error and reset the key with old value
             throw new SqlExecutionException(
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 6ba1b544500..22c4ffcdf32 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -301,7 +301,6 @@ public class OperationExecutor {
 
     // --------------------------------------------------------------------------------------------
 
-    @VisibleForTesting
     public TableEnvironmentInternal getTableEnvironment() {
         // checks the value of RUNTIME_MODE
         Configuration operationConfig = sessionContext.getSessionConf().clone();
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
index 890e55acc02..0ba90189172 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
@@ -65,6 +65,7 @@ import java.util.jar.JarFile;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.configuration.RestOptions.PORT;
 import static org.apache.flink.table.gateway.utils.SqlScriptReader.HINT_START_OF_OUTPUT;
 import static org.apache.flink.table.planner.utils.TableTestUtil.replaceNodeIdInOperator;
 import static org.apache.flink.table.planner.utils.TableTestUtil.replaceStreamNodeId;
@@ -121,6 +122,8 @@ public abstract class AbstractSqlGatewayStatementITCase extends AbstractTestBase
         replaceVars.put(
                 "$VAR_BATCH_CTAS_PATH",
                 Files.createDirectory(temporaryFolder.resolve("batch_ctas")).toFile().getPath());
+        replaceVars.put(
+                "$VAR_REST_PORT", MINI_CLUSTER.getClientConfiguration().get(PORT).toString());
     }
 
     @TestTemplate
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
index bc6976b887c..8115379b7c0 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
@@ -64,15 +64,12 @@ class SessionContextTest {
 
     @Test
     public void testSetAndResetOption() {
-        // table config option
-        sessionContext.set(TABLE_SQL_DIALECT.key(), "hive");
         // runtime config option
         sessionContext.set(MAX_PARALLELISM.key(), "128");
         // runtime config option and doesn't have default value
         sessionContext.set(NAME.key(), "test");
         // runtime config from flink-conf
         sessionContext.set(OBJECT_REUSE.key(), "false");
-        assertThat(sessionContext.getSessionConf().get(TABLE_SQL_DIALECT)).isEqualTo("hive");
         assertThat(sessionContext.getSessionConf().get(MAX_PARALLELISM)).isEqualTo(128);
         assertThat(sessionContext.getSessionConf().get(NAME)).isEqualTo("test");
         assertThat(sessionContext.getSessionConf().get(OBJECT_REUSE)).isFalse();
@@ -89,8 +86,6 @@ class SessionContextTest {
 
     @Test
     public void testSetAndResetKeyInConfigOptions() {
-        // table config option
-        sessionContext.set(TABLE_SQL_DIALECT.key(), "hive");
         // runtime config option
         sessionContext.set(MAX_PARALLELISM.key(), "128");
         // runtime config option and doesn't have default value
@@ -98,7 +93,6 @@ class SessionContextTest {
         // runtime config from flink-conf
         sessionContext.set(OBJECT_REUSE.key(), "false");
 
-        assertThat(sessionContext.getSessionConf().get(TABLE_SQL_DIALECT)).isEqualTo("hive");
         assertThat(sessionContext.getSessionConf().get(MAX_PARALLELISM)).isEqualTo(128);
         assertThat(sessionContext.getSessionConf().get(NAME)).isEqualTo("test");
         assertThat(sessionContext.getSessionConf().get(OBJECT_REUSE)).isFalse();
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/set.q b/flink-table/flink-sql-gateway/src/test/resources/sql/set.q
new file mode 100644
index 00000000000..c2e772c230a
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/set.q
@@ -0,0 +1,68 @@
+# set.q - SET, RESET
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+reset table.resources.download-dir;
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+set;
+!output
++--------------------------------------------+-----------+
+|                                        key |     value |
++--------------------------------------------+-----------+
+|                         execution.attached |      true |
+|           execution.savepoint-restore-mode |  NO_CLAIM |
+| execution.savepoint.ignore-unclaimed-state |     false |
+|        execution.shutdown-on-attached-exit |     false |
+|                           execution.target |    remote |
+|                     jobmanager.rpc.address | localhost |
+|                        pipeline.classpaths |           |
+|                              pipeline.jars |           |
+|                                  rest.port |     $VAR_REST_PORT |
++--------------------------------------------+-----------+
+9 rows in set
+!ok
+
+# set illegal value
+set 'table.sql-dialect' = 'unknown';
+!output
+java.lang.IllegalArgumentException: No enum constant org.apache.flink.table.api.SqlDialect.UNKNOWN
+!error
+
+set;
+!output
++--------------------------------------------+-----------+
+|                                        key |     value |
++--------------------------------------------+-----------+
+|                         execution.attached |      true |
+|           execution.savepoint-restore-mode |  NO_CLAIM |
+| execution.savepoint.ignore-unclaimed-state |     false |
+|        execution.shutdown-on-attached-exit |     false |
+|                           execution.target |    remote |
+|                     jobmanager.rpc.address | localhost |
+|                        pipeline.classpaths |           |
+|                              pipeline.jars |           |
+|                                  rest.port |     $VAR_REST_PORT |
++--------------------------------------------+-----------+
+9 rows in set
+!ok