You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/21 01:46:51 UTC
[flink] 04/04: [FLINK-29191][hive] fix Hive dialect can't get value for the variables set by set command
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d4d855a3c08733afac935d87df6544f0811aef84
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Wed Sep 7 10:54:26 2022 +0800
[FLINK-29191][hive] fix Hive dialect can't get value for the variables set by set command
---
.../delegation/hive/HiveOperationExecutor.java | 7 +++++--
.../table/planner/delegation/hive/HiveParser.java | 18 ++++++++++------
.../delegation/hive/copy/HiveSetProcessor.java | 21 ++++++++++++-------
.../flink/connectors/hive/HiveDialectITCase.java | 13 ++----------
.../flink-sql-client/src/test/resources/sql/set.q | 24 ++++++++++++++++++++++
5 files changed, 57 insertions(+), 26 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
index 597584cc3fd..664ce4e6e35 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -85,7 +85,7 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor {
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null);
if (!(currentCatalog instanceof HiveCatalog)) {
throw new FlinkHiveException(
- "Only support SET command when the current catalog is HiveCatalog ing Hive dialect.");
+ "Only support SET command when the current catalog is HiveCatalog in Hive dialect.");
}
HiveConf hiveConf = ((HiveCatalog) currentCatalog).getHiveConf();
@@ -112,7 +112,10 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor {
// set key
String option =
HiveSetProcessor.getVariable(
- hiveConf, hiveVariables, hiveSetOperation.getKey().get());
+ tableConfig.getConfiguration().toMap(),
+ hiveConf,
+ hiveVariables,
+ hiveSetOperation.getKey().get());
return Optional.of(buildResultForShowVariable(Collections.singletonList(option)));
} else {
HiveSetProcessor.setVariable(
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 0dd5613a8e3..c8d1630133e 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -217,15 +217,16 @@ public class HiveParser extends ParserImpl {
}
private Optional<Operation> tryProcessHiveNonSqlStatement(HiveConf hiveConf, String statement) {
+ statement = statement.trim();
+ if (statement.endsWith(";")) {
+ // the command may end with ";" since it won't be removed by Flink SQL CLI,
+ // so, we need to remove ";"
+ statement = statement.substring(0, statement.length() - 1);
+ }
String[] commandTokens = statement.split("\\s+");
HiveCommand hiveCommand = HiveCommand.find(commandTokens);
if (hiveCommand != null) {
String cmdArgs = statement.substring(commandTokens[0].length()).trim();
- // the command may end with ";" since it won't be removed by Flink SQL CLI,
- // so, we need to remove ";"
- if (cmdArgs.endsWith(";")) {
- cmdArgs = cmdArgs.substring(0, cmdArgs.length() - 1);
- }
if (hiveCommand == HiveCommand.SET) {
return Optional.of(processSetCmd(statement, cmdArgs));
} else if (hiveCommand == HiveCommand.RESET) {
@@ -242,9 +243,14 @@ public class HiveParser extends ParserImpl {
private Operation processSetCmd(String originCmd, String setCmdArgs) {
if (setCmdArgs.equals("")) {
- return new HiveSetOperation();
+ // the command is "set", if we follow Hive's behavior, it will output all configurations
+ // including hiveconf, hivevar, env, ... which are too many.
+ // So in here, for this case, just delegate to Flink's own behavior
+ // which will only output the flink configuration.
+ return super.parse(originCmd).get(0);
}
if (setCmdArgs.equals("-v")) {
+ // the command is "set -v", for such case, we will follow Hive's behavior.
return new HiveSetOperation(true);
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
index ff8b7de2c8b..52577ca0136 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveSetProcessor.java
@@ -43,7 +43,6 @@ import static org.apache.hadoop.hive.conf.SystemVariables.SYSTEM_PREFIX;
public class HiveSetProcessor {
private static final String[] PASSWORD_STRINGS = new String[] {"password", "paswd", "pswd"};
- private static final String FLINK_PREFIX = "flink:";
/** Set variable following Hive's implementation. */
public static void setVariable(
@@ -137,7 +136,10 @@ public class HiveSetProcessor {
}
public static String getVariable(
- HiveConf hiveConf, Map<String, String> hiveVariables, String varname) {
+ Map<String, String> flinkConf,
+ HiveConf hiveConf,
+ Map<String, String> hiveVariables,
+ String varname) {
if (varname.equals("silent")) {
return "silent is not a valid variable";
}
@@ -198,7 +200,7 @@ public class HiveSetProcessor {
return varname + " is undefined as a hive meta variable";
}
} else {
- return dumpOption(hiveConf, hiveVariables, varname);
+ return dumpOption(flinkConf, hiveConf, hiveVariables, varname);
}
}
@@ -216,8 +218,13 @@ public class HiveSetProcessor {
}
private static String dumpOption(
- HiveConf hiveConf, Map<String, String> hiveVariables, String s) {
- if (hiveConf.isHiddenConfig(s)) {
+ Map<String, String> flinkConf,
+ HiveConf hiveConf,
+ Map<String, String> hiveVariables,
+ String s) {
+ if (flinkConf.get(s) != null) {
+ return s + "=" + flinkConf.get(s);
+ } else if (hiveConf.isHiddenConfig(s)) {
return s + " is a hidden config";
} else if (hiveConf.get(s) != null) {
return s + "=" + hiveConf.get(s);
@@ -241,7 +248,7 @@ public class HiveSetProcessor {
if (hiveConf.isHiddenConfig(oneProp)) {
continue;
}
- sortedMap.put(oneProp, oneValue);
+ sortedMap.put(HIVECONF_PREFIX + oneProp, oneValue);
}
// Inserting hive variables
@@ -271,7 +278,7 @@ public class HiveSetProcessor {
// Insert Flink table config variable
for (Map.Entry<String, String> entry :
mapToSortedMap(tableConfig.getConfiguration().toMap()).entrySet()) {
- optionsList.add(FLINK_PREFIX + entry.getKey() + "=" + entry.getValue());
+ optionsList.add(entry.getKey() + "=" + entry.getValue());
}
return optionsList;
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 1ae21e8f27a..42e3a2ba908 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -1126,23 +1126,14 @@ public class HiveDialectITCase {
CollectionUtil.iteratorToList(tableEnv.executeSql("set system:xxx").collect());
assertThat(rows.toString()).isEqualTo("[+I[system:xxx=5]]");
- // test 'set'
- rows = CollectionUtil.iteratorToList(tableEnv.executeSql("set").collect());
- assertThat(rows.toString())
- .contains("system:xxx=5")
- .contains("env:PATH=" + System.getenv("PATH"))
- .contains("flink:execution.runtime-mode=BATCH")
- .contains("hivevar:a=1")
- .contains("common-key=common-val");
-
// test 'set -v'
rows = CollectionUtil.iteratorToList(tableEnv.executeSql("set -v").collect());
assertThat(rows.toString())
.contains("system:xxx=5")
.contains("env:PATH=" + System.getenv("PATH"))
- .contains("flink:execution.runtime-mode=BATCH")
.contains("hivevar:a=1")
- .contains("fs.defaultFS=file:///");
+ .contains("hiveconf:fs.defaultFS=file:///")
+ .contains("execution.runtime-mode=BATCH");
// test set env isn't supported
assertThatThrownBy(() -> tableEnv.executeSql("set env:xxx=yyy"))
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index b87130b6eba..a450c4e9b42 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -35,6 +35,29 @@ use catalog hivecatalog;
[INFO] Execute statement succeed.
!info
+# test SET command
+set table.sql-dialect;
++------------------------+
+| variables |
++------------------------+
+| table.sql-dialect=hive |
++------------------------+
+1 row in set
+!ok
+
+set k1=v1;
+[INFO] Session property has been set.
+!info
+
+set k1;
++-----------+
+| variables |
++-----------+
+| k1=v1 |
++-----------+
+1 row in set
+!ok
+
# test create a hive table to verify the configuration works
CREATE TABLE hive_table (
product_id STRING,
@@ -104,6 +127,7 @@ set;
'execution.shutdown-on-attached-exit' = 'false'
'execution.target' = 'remote'
'jobmanager.rpc.address' = '$VAR_JOBMANAGER_RPC_ADDRESS'
+'k1' = 'v1'
'pipeline.classpaths' = ''
'pipeline.jars' = ''
'rest.port' = '$VAR_REST_PORT'