You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/05 11:41:46 UTC
[flink] branch master updated: [FLINK-26371][hive] Support variable substitution for sql statement while using Hive dialect (#19656)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new abba1d7c853 [FLINK-26371][hive] Support variable substitution for sql statement while using Hive dialect (#19656)
abba1d7c853 is described below
commit abba1d7c853c8f198e65dd2dd073b51a19f133a4
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Fri Aug 5 19:41:35 2022 +0800
[FLINK-26371][hive] Support variable substitution for sql statement while using Hive dialect (#19656)
---
.../table/planner/delegation/hive/HiveParser.java | 12 ++++++++
.../flink/connectors/hive/HiveDialectITCase.java | 35 ++++++++++++++++++++++
2 files changed, 47 insertions(+)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 79ffb6bb760..96d3990ce88 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -57,6 +57,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.VariableSubstitution;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -218,6 +219,8 @@ public class HiveParser extends ParserImpl {
HiveShim hiveShim =
HiveShimLoader.loadHiveShim(((HiveCatalog) currentCatalog).getHiveVersion());
try {
+ // substitute variables for the statement
+ statement = substituteVariables(hiveConf, statement);
// creates SessionState
startSessionState(hiveConf, catalogManager);
// We override Hive's grouping function. Refer to the implementation for more details.
@@ -294,6 +297,15 @@ public class HiveParser extends ParserImpl {
return new HiveSetOperation(nwcmd);
}
+ /**
+ * Substitute the variables in the statement. For statement 'select ${hiveconf:foo}', the
+ * variable '${hiveconf:foo}' will be replaced with the actual value with key 'foo' in hive
+ * conf.
+ */
+ private String substituteVariables(HiveConf conf, String statement) {
+ return new VariableSubstitution(() -> hiveVariables).substitute(conf, statement);
+ }
+
private List<Operation> processCmd(
String cmd, HiveConf hiveConf, HiveShim hiveShim, HiveCatalog hiveCatalog) {
try {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index baf8c2ab4a5..e7f21e6de8b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -1024,22 +1024,51 @@ public class HiveDialectITCase {
// test set system:
tableEnv.executeSql("set system:xxx=5");
assertThat(System.getProperty("xxx")).isEqualTo("5");
+ List<Row> result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select '${system:xxx}'").collect());
+ assertThat(result.toString()).isEqualTo("[+I[5]]");
// test set hiveconf:
tableEnv.executeSql("set hiveconf:yyy=${system:xxx}");
assertThat(hiveCatalog.getHiveConf().get("yyy")).isEqualTo("5");
+ // disable variable substitute
+ tableEnv.executeSql("set hiveconf:hive.variable.substitute=false");
+ result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select '${hiveconf:yyy}'").collect());
+ assertThat(result.toString()).isEqualTo("[+I[${hiveconf:yyy}]]");
+ // enable variable substitute again
+ tableEnv.executeSql("set hiveconf:hive.variable.substitute=true");
+ result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select ${hiveconf:yyy}").collect());
+ assertThat(result.toString()).isEqualTo("[+I[5]]");
// test set hivevar:
tableEnv.executeSql("set hivevar:a=1");
tableEnv.executeSql("set hiveconf:zzz=${hivevar:a}");
assertThat(hiveCatalog.getHiveConf().get("zzz")).isEqualTo("1");
+ // test set nested variables
+ tableEnv.executeSql("set hiveconf:b=a");
+ tableEnv.executeSql("set system:c=${hivevar:${hiveconf:b}}");
+ assertThat(System.getProperty("c")).isEqualTo("1");
+ result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select ${hivevar:${hiveconf:b}}").collect());
+ assertThat(result.toString()).isEqualTo("[+I[1]]");
+
// test the hivevar still exists when we renew the sql parser
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("show tables");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("set hiveconf:zzz1=${hivevar:a}");
assertThat(hiveCatalog.getHiveConf().get("zzz1")).isEqualTo("1");
+ result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select ${hiveconf:zzz1}").collect());
+ assertThat(result.toString()).isEqualTo("[+I[1]]");
// test set metaconf:
tableEnv.executeSql("set metaconf:hive.metastore.try.direct.sql=false");
@@ -1084,6 +1113,12 @@ public class HiveDialectITCase {
assertThatThrownBy(() -> tableEnv.executeSql("set env:xxx=yyy"))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("env:* variables can not be set.");
+ // test substitution for env variable in sql
+ String path = System.getenv("PATH");
+ result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select '${env:PATH}'").collect());
+ assertThat(result.toString()).isEqualTo(String.format("[+I[%s]]", path));
}
@Test