You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/05 11:41:46 UTC

[flink] branch master updated: [FLINK-26371][hive] Support variable substitution for sql statement while using Hive dialect (#19656)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new abba1d7c853 [FLINK-26371][hive] Support variable substitution for sql statement while using Hive dialect (#19656)
abba1d7c853 is described below

commit abba1d7c853c8f198e65dd2dd073b51a19f133a4
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Fri Aug 5 19:41:35 2022 +0800

    [FLINK-26371][hive] Support variable substitution for sql statement while using Hive dialect (#19656)
---
 .../table/planner/delegation/hive/HiveParser.java  | 12 ++++++++
 .../flink/connectors/hive/HiveDialectITCase.java   | 35 ++++++++++++++++++++++
 2 files changed, 47 insertions(+)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 79ffb6bb760..96d3990ce88 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -57,6 +57,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.VariableSubstitution;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -218,6 +219,8 @@ public class HiveParser extends ParserImpl {
         HiveShim hiveShim =
                 HiveShimLoader.loadHiveShim(((HiveCatalog) currentCatalog).getHiveVersion());
         try {
+            // substitute variables for the statement
+            statement = substituteVariables(hiveConf, statement);
             // creates SessionState
             startSessionState(hiveConf, catalogManager);
             // We override Hive's grouping function. Refer to the implementation for more details.
@@ -294,6 +297,15 @@ public class HiveParser extends ParserImpl {
         return new HiveSetOperation(nwcmd);
     }
 
+    /**
+     * Substitute the variables in the statement. For statement 'select ${hiveconf:foo}', the
+     * variable '${hiveconf:foo}' will be replaced with the actual value with key 'foo' in hive
+     * conf.
+     */
+    private String substituteVariables(HiveConf conf, String statement) {
+        return new VariableSubstitution(() -> hiveVariables).substitute(conf, statement);
+    }
+
     private List<Operation> processCmd(
             String cmd, HiveConf hiveConf, HiveShim hiveShim, HiveCatalog hiveCatalog) {
         try {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index baf8c2ab4a5..e7f21e6de8b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -1024,22 +1024,51 @@ public class HiveDialectITCase {
         // test set system:
         tableEnv.executeSql("set system:xxx=5");
         assertThat(System.getProperty("xxx")).isEqualTo("5");
+        List<Row> result =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select '${system:xxx}'").collect());
+        assertThat(result.toString()).isEqualTo("[+I[5]]");
 
         // test set hiveconf:
         tableEnv.executeSql("set hiveconf:yyy=${system:xxx}");
         assertThat(hiveCatalog.getHiveConf().get("yyy")).isEqualTo("5");
+        // disable variable substitute
+        tableEnv.executeSql("set hiveconf:hive.variable.substitute=false");
+        result =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select '${hiveconf:yyy}'").collect());
+        assertThat(result.toString()).isEqualTo("[+I[${hiveconf:yyy}]]");
+        // enable variable substitute again
+        tableEnv.executeSql("set hiveconf:hive.variable.substitute=true");
+        result =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select ${hiveconf:yyy}").collect());
+        assertThat(result.toString()).isEqualTo("[+I[5]]");
 
         // test set hivevar:
         tableEnv.executeSql("set hivevar:a=1");
         tableEnv.executeSql("set hiveconf:zzz=${hivevar:a}");
         assertThat(hiveCatalog.getHiveConf().get("zzz")).isEqualTo("1");
 
+        // test set nested variables
+        tableEnv.executeSql("set hiveconf:b=a");
+        tableEnv.executeSql("set system:c=${hivevar:${hiveconf:b}}");
+        assertThat(System.getProperty("c")).isEqualTo("1");
+        result =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select ${hivevar:${hiveconf:b}}").collect());
+        assertThat(result.toString()).isEqualTo("[+I[1]]");
+
         // test the hivevar still exists when we renew the sql parser
         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
         tableEnv.executeSql("show tables");
         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
         tableEnv.executeSql("set hiveconf:zzz1=${hivevar:a}");
         assertThat(hiveCatalog.getHiveConf().get("zzz1")).isEqualTo("1");
+        result =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select ${hiveconf:zzz1}").collect());
+        assertThat(result.toString()).isEqualTo("[+I[1]]");
 
         // test set metaconf:
         tableEnv.executeSql("set metaconf:hive.metastore.try.direct.sql=false");
@@ -1084,6 +1113,12 @@ public class HiveDialectITCase {
         assertThatThrownBy(() -> tableEnv.executeSql("set env:xxx=yyy"))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessage("env:* variables can not be set.");
+        // test substitution for env variable in sql
+        String path = System.getenv("PATH");
+        result =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select '${env:PATH}'").collect());
+        assertThat(result.toString()).isEqualTo(String.format("[+I[%s]]", path));
     }
 
     @Test