You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/21 01:46:51 UTC

[flink] 04/04: [FLINK-29191][hive] fix Hive dialect can't get value for the variables set by set command

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4d855a3c08733afac935d87df6544f0811aef84
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Wed Sep 7 10:54:26 2022 +0800

    [FLINK-29191][hive] fix Hive dialect can't get value for the variables set by set command
---
 .../delegation/hive/HiveOperationExecutor.java     |  7 +++++--
 .../table/planner/delegation/hive/HiveParser.java  | 18 ++++++++++------
 .../delegation/hive/copy/HiveSetProcessor.java     | 21 ++++++++++++-------
 .../flink/connectors/hive/HiveDialectITCase.java   | 13 ++----------
 .../flink-sql-client/src/test/resources/sql/set.q  | 24 ++++++++++++++++++++++
 5 files changed, 57 insertions(+), 26 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
index 597584cc3fd..664ce4e6e35 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -85,7 +85,7 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor {
                 catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null);
         if (!(currentCatalog instanceof HiveCatalog)) {
             throw new FlinkHiveException(
-                    "Only support SET command when the current catalog is HiveCatalog ing Hive dialect.");
+                    "Only support SET command when the current catalog is HiveCatalog in Hive dialect.");
         }
 
         HiveConf hiveConf = ((HiveCatalog) currentCatalog).getHiveConf();
@@ -112,7 +112,10 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor {
                 // set key
                 String option =
                         HiveSetProcessor.getVariable(
-                                hiveConf, hiveVariables, hiveSetOperation.getKey().get());
+                                tableConfig.getConfiguration().toMap(),
+                                hiveConf,
+                                hiveVariables,
+                                hiveSetOperation.getKey().get());
                 return Optional.of(buildResultForShowVariable(Collections.singletonList(option)));
             } else {
                 HiveSetProcessor.setVariable(
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 0dd5613a8e3..c8d1630133e 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -217,15 +217,16 @@ public class HiveParser extends ParserImpl {
     }
 
     private Optional<Operation> tryProcessHiveNonSqlStatement(HiveConf hiveConf, String statement) {
+        statement = statement.trim();
+        if (statement.endsWith(";")) {
+            // the command may end with ";" since it won't be removed by Flink SQL CLI,
+            // so, we need to remove ";"
+            statement = statement.substring(0, statement.length() - 1);
+        }
         String[] commandTokens = statement.split("\\s+");
         HiveCommand hiveCommand = HiveCommand.find(commandTokens);
         if (hiveCommand != null) {
             String cmdArgs = statement.substring(commandTokens[0].length()).trim();
-            // the command may end with ";" since it won't be removed by Flink SQL CLI,
-            // so, we need to remove ";"
-            if (cmdArgs.endsWith(";")) {
-                cmdArgs = cmdArgs.substring(0, cmdArgs.length() - 1);
-            }
             if (hiveCommand == HiveCommand.SET) {
                 return Optional.of(processSetCmd(statement, cmdArgs));
             } else if (hiveCommand == HiveCommand.RESET) {
@@ -242,9 +243,14 @@ public class HiveParser extends ParserImpl {
 
     private Operation processSetCmd(String originCmd, String setCmdArgs) {
         if (setCmdArgs.equals("")) {
-            return new HiveSetOperation();
+            // the command is "set", if we follow Hive's behavior, it will output all configurations
+            // including hiveconf, hivevar, env, ... which are too many.
+            // So in here, for this case, just delegate to Flink's own behavior
+            // which will only output the flink configuration.
+            return super.parse(originCmd).get(0);
         }
         if (setCmdArgs.equals("-v")) {
+            // the command is "set -v", for such case, we will follow Hive's behavior.
             return new HiveSetOperation(true);
         }
 
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
index ff8b7de2c8b..52577ca0136 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
@@ -43,7 +43,6 @@ import static org.apache.hadoop.hive.conf.SystemVariables.SYSTEM_PREFIX;
 public class HiveSetProcessor {
 
     private static final String[] PASSWORD_STRINGS = new String[] {"password", "paswd", "pswd"};
-    private static final String FLINK_PREFIX = "flink:";
 
     /** Set variable following Hive's implementation. */
     public static void setVariable(
@@ -137,7 +136,10 @@ public class HiveSetProcessor {
     }
 
     public static String getVariable(
-            HiveConf hiveConf, Map<String, String> hiveVariables, String varname) {
+            Map<String, String> flinkConf,
+            HiveConf hiveConf,
+            Map<String, String> hiveVariables,
+            String varname) {
         if (varname.equals("silent")) {
             return "silent is not a valid variable";
         }
@@ -198,7 +200,7 @@ public class HiveSetProcessor {
                 return varname + " is undefined as a hive meta variable";
             }
         } else {
-            return dumpOption(hiveConf, hiveVariables, varname);
+            return dumpOption(flinkConf, hiveConf, hiveVariables, varname);
         }
     }
 
@@ -216,8 +218,13 @@ public class HiveSetProcessor {
     }
 
     private static String dumpOption(
-            HiveConf hiveConf, Map<String, String> hiveVariables, String s) {
-        if (hiveConf.isHiddenConfig(s)) {
+            Map<String, String> flinkConf,
+            HiveConf hiveConf,
+            Map<String, String> hiveVariables,
+            String s) {
+        if (flinkConf.get(s) != null) {
+            return s + "=" + flinkConf.get(s);
+        } else if (hiveConf.isHiddenConfig(s)) {
             return s + " is a hidden config";
         } else if (hiveConf.get(s) != null) {
             return s + "=" + hiveConf.get(s);
@@ -241,7 +248,7 @@ public class HiveSetProcessor {
             if (hiveConf.isHiddenConfig(oneProp)) {
                 continue;
             }
-            sortedMap.put(oneProp, oneValue);
+            sortedMap.put(HIVECONF_PREFIX + oneProp, oneValue);
         }
 
         // Inserting hive variables
@@ -271,7 +278,7 @@ public class HiveSetProcessor {
         // Insert Flink table config variable
         for (Map.Entry<String, String> entry :
                 mapToSortedMap(tableConfig.getConfiguration().toMap()).entrySet()) {
-            optionsList.add(FLINK_PREFIX + entry.getKey() + "=" + entry.getValue());
+            optionsList.add(entry.getKey() + "=" + entry.getValue());
         }
 
         return optionsList;
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 1ae21e8f27a..42e3a2ba908 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -1126,23 +1126,14 @@ public class HiveDialectITCase {
                 CollectionUtil.iteratorToList(tableEnv.executeSql("set system:xxx").collect());
         assertThat(rows.toString()).isEqualTo("[+I[system:xxx=5]]");
 
-        // test 'set'
-        rows = CollectionUtil.iteratorToList(tableEnv.executeSql("set").collect());
-        assertThat(rows.toString())
-                .contains("system:xxx=5")
-                .contains("env:PATH=" + System.getenv("PATH"))
-                .contains("flink:execution.runtime-mode=BATCH")
-                .contains("hivevar:a=1")
-                .contains("common-key=common-val");
-
         // test 'set -v'
         rows = CollectionUtil.iteratorToList(tableEnv.executeSql("set -v").collect());
         assertThat(rows.toString())
                 .contains("system:xxx=5")
                 .contains("env:PATH=" + System.getenv("PATH"))
-                .contains("flink:execution.runtime-mode=BATCH")
                 .contains("hivevar:a=1")
-                .contains("fs.defaultFS=file:///");
+                .contains("hiveconf:fs.defaultFS=file:///")
+                .contains("execution.runtime-mode=BATCH");
 
         // test set env isn't supported
         assertThatThrownBy(() -> tableEnv.executeSql("set env:xxx=yyy"))
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index b87130b6eba..a450c4e9b42 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -35,6 +35,29 @@ use catalog hivecatalog;
 [INFO] Execute statement succeed.
 !info
 
+# test SET command
+set table.sql-dialect;
++------------------------+
+|              variables |
++------------------------+
+| table.sql-dialect=hive |
++------------------------+
+1 row in set
+!ok
+
+set k1=v1;
+[INFO] Session property has been set.
+!info
+
+set k1;
++-----------+
+| variables |
++-----------+
+|     k1=v1 |
++-----------+
+1 row in set
+!ok
+
 # test create a hive table to verify the configuration works
 CREATE TABLE hive_table (
   product_id STRING,
@@ -104,6 +127,7 @@ set;
 'execution.shutdown-on-attached-exit' = 'false'
 'execution.target' = 'remote'
 'jobmanager.rpc.address' = '$VAR_JOBMANAGER_RPC_ADDRESS'
+'k1' = 'v1'
 'pipeline.classpaths' = ''
 'pipeline.jars' = ''
 'rest.port' = '$VAR_REST_PORT'