You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/06 15:35:24 UTC

[flink] 01/02: [FLINK-28772][hive] Supports ADD JAR command in Hive dialect

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6d06cdaa1633035c127483af1dc4b45e57cbc035
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Thu Jul 14 11:10:42 2022 +0800

    [FLINK-28772][hive] Supports ADD JAR command in Hive dialect
    
    This closes #20413
---
 .../table/planner/delegation/hive/HiveParser.java  | 65 +++++++++++++++-------
 .../flink/connectors/hive/HiveDialectITCase.java   | 28 ++++++++++
 .../flink/table/client/cli/CliClientITCase.java    | 11 ++++
 .../flink-sql-client/src/test/resources/sql/set.q  | 30 ++++++++++
 4 files changed, 113 insertions(+), 21 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 96d3990ce88..7ec483fdbfd 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.NopOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.AddJarOperation;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.delegation.ParserImpl;
 import org.apache.flink.table.planner.delegation.PlannerContext;
@@ -208,7 +209,9 @@ public class HiveParser extends ParserImpl {
             return super.parse(statement);
         }
 
-        Optional<Operation> nonSqlOperation = tryProcessHiveNonSqlStatement(statement);
+        Optional<Operation> nonSqlOperation =
+                tryProcessHiveNonSqlStatement(
+                        ((HiveCatalog) currentCatalog).getHiveConf(), statement);
         if (nonSqlOperation.isPresent()) {
             return Collections.singletonList(nonSqlOperation.get());
         }
@@ -231,16 +234,22 @@ public class HiveParser extends ParserImpl {
         }
     }
 
-    private Optional<Operation> tryProcessHiveNonSqlStatement(String statement) {
+    private Optional<Operation> tryProcessHiveNonSqlStatement(HiveConf hiveConf, String statement) {
         String[] commandTokens = statement.split("\\s+");
         HiveCommand hiveCommand = HiveCommand.find(commandTokens);
         if (hiveCommand != null) {
+            String cmdArgs = statement.substring(commandTokens[0].length()).trim();
+            // the command may end with ";" since it won't be removed by Flink SQL CLI,
+            // so, we need to remove ";"
+            if (cmdArgs.endsWith(";")) {
+                cmdArgs = cmdArgs.substring(0, cmdArgs.length() - 1);
+            }
             if (hiveCommand == HiveCommand.SET) {
-                return Optional.of(
-                        processSetCmd(
-                                statement, statement.substring(commandTokens[0].length()).trim()));
+                return Optional.of(processSetCmd(statement, cmdArgs));
             } else if (hiveCommand == HiveCommand.RESET) {
                 return Optional.of(super.parse(statement).get(0));
+            } else if (hiveCommand == HiveCommand.ADD) {
+                return Optional.of(processAddCmd(substituteVariables(hiveConf, cmdArgs)));
             } else {
                 throw new UnsupportedOperationException(
                         String.format("The Hive command %s is not supported.", hiveCommand));
@@ -250,29 +259,22 @@ public class HiveParser extends ParserImpl {
     }
 
     private Operation processSetCmd(String originCmd, String setCmdArgs) {
-        String nwcmd = setCmdArgs.trim();
-        // the set command may end with ";" since it won't be removed by Flink SQL CLI,
-        // so, we need to remove ";"
-        if (nwcmd.endsWith(";")) {
-            nwcmd = nwcmd.substring(0, nwcmd.length() - 1);
-        }
-
-        if (nwcmd.equals("")) {
+        if (setCmdArgs.equals("")) {
             return new HiveSetOperation();
         }
-        if (nwcmd.equals("-v")) {
+        if (setCmdArgs.equals("-v")) {
             return new HiveSetOperation(true);
         }
 
         String[] part = new String[2];
-        int eqIndex = nwcmd.indexOf('=');
-        if (nwcmd.contains("=")) {
-            if (eqIndex == nwcmd.length() - 1) { // x=
-                part[0] = nwcmd.substring(0, nwcmd.length() - 1);
+        int eqIndex = setCmdArgs.indexOf('=');
+        if (setCmdArgs.contains("=")) {
+            if (eqIndex == setCmdArgs.length() - 1) { // x=
+                part[0] = setCmdArgs.substring(0, setCmdArgs.length() - 1);
                 part[1] = "";
             } else { // x=y
-                part[0] = nwcmd.substring(0, eqIndex).trim();
-                part[1] = nwcmd.substring(eqIndex + 1).trim();
+                part[0] = setCmdArgs.substring(0, eqIndex).trim();
+                part[1] = setCmdArgs.substring(eqIndex + 1).trim();
                 if (!startWithHiveSpecialVariablePrefix(part[0])) {
                     // TODO:
                     // currently, for the command set key=value, we will fall to
@@ -294,7 +296,7 @@ public class HiveParser extends ParserImpl {
             }
             return new HiveSetOperation(part[0], part[1]);
         }
-        return new HiveSetOperation(nwcmd);
+        return new HiveSetOperation(setCmdArgs);
     }
 
     /**
@@ -306,6 +308,27 @@ public class HiveParser extends ParserImpl {
         return new VariableSubstitution(() -> hiveVariables).substitute(conf, statement);
     }
 
+    private Operation processAddCmd(String addCmdArgs) {
+        String[] tokens = addCmdArgs.split("\\s+");
+        SessionState.ResourceType resourceType = SessionState.find_resource_type(tokens[0]);
+        if (resourceType == SessionState.ResourceType.FILE) {
+            throw new UnsupportedOperationException(
+                    "ADD FILE is not supported yet. Usage: add JAR <value>");
+        } else if (resourceType == SessionState.ResourceType.ARCHIVE) {
+            throw new UnsupportedOperationException(
+                    "Add ARCHIVE is not supported yet. Usage: add JAR <value>");
+        } else if (resourceType == SessionState.ResourceType.JAR) {
+            if (tokens.length != 2) {
+                throw new UnsupportedOperationException(
+                        "Add multiple jar in one single statement is not supported yet. Usage: add JAR <value>");
+            }
+            return new AddJarOperation(tokens[1]);
+        } else {
+            throw new IllegalArgumentException(
+                    String.format("Unknown resource type: %s.", tokens[0]));
+        }
+    }
+
     private List<Operation> processCmd(
             String cmd, HiveConf hiveConf, HiveShim hiveShim, HiveCatalog hiveCatalog) {
         try {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 0285fc3a4c7..4ecc4d7ea68 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.functions.hive.HiveGenericUDTFTest;
 import org.apache.flink.table.functions.hive.util.TestSplitUDTFInitializeWithStructObjectInspector;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.command.AddJarOperation;
 import org.apache.flink.table.operations.command.ClearOperation;
 import org.apache.flink.table.operations.command.HelpOperation;
 import org.apache.flink.table.operations.command.QuitOperation;
@@ -1144,6 +1145,33 @@ public class HiveDialectITCase {
         assertThat(result.toString()).isEqualTo(String.format("[+I[%s]]", path));
     }
 
+    @Test
+    public void testAddCommand() {
+        TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) tableEnv;
+        Parser parser = tableEnvInternal.getParser();
+
+        // test add jar
+        Operation operation = parser.parse("add jar test.jar").get(0);
+        assertThat(operation).isInstanceOf(AddJarOperation.class);
+        assertThat(((AddJarOperation) operation).getPath()).isEqualTo("test.jar");
+        // test add jar with variable substitute
+        operation = parser.parse("add jar \"${hiveconf:common-key}.jar\"").get(0);
+        assertThat(operation).isInstanceOf(AddJarOperation.class);
+        assertThat(((AddJarOperation) operation).getPath()).isEqualTo("\"common-val.jar\"");
+
+        // test unsupported add command
+        assertThatThrownBy(() -> tableEnv.executeSql("add jar t1.jar t2.jar"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        "Add multiple jar in one single statement is not supported yet. Usage: add JAR <value>");
+        assertThatThrownBy(() -> tableEnv.executeSql("add File t1.txt"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("ADD FILE is not supported yet. Usage: add JAR <value>");
+        assertThatThrownBy(() -> tableEnv.executeSql("add Archive t1.tgz"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Add ARCHIVE is not supported yet. Usage: add JAR <value>");
+    }
+
     @Test
     public void testUnsupportedOperation() {
         List<String> statements =
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
index e31ee990058..60a1e76b69b 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
@@ -78,6 +78,16 @@ import static org.assertj.core.api.Assertions.assertThat;
 @RunWith(Parameterized.class)
 public class CliClientITCase extends AbstractTestBase {
 
+    private static final String HIVE_ADD_ONE_UDF_CLASS = "HiveAddOneFunc";
+    private static final String HIVE_ADD_ONE_UDF_CODE =
+            "public class "
+                    + HIVE_ADD_ONE_UDF_CLASS
+                    + " extends org.apache.hadoop.hive.ql.exec.UDF {\n"
+                    + " public int evaluate(int content) {\n"
+                    + "    return content + 1;\n"
+                    + " }"
+                    + "}\n";
+
     private static Path historyPath;
     private static Map<String, String> replaceVars;
 
@@ -109,6 +119,7 @@ public class CliClientITCase extends AbstractTestBase {
         classNameCodes.put(
                 GENERATED_UPPER_UDF_CLASS,
                 String.format(GENERATED_UPPER_UDF_CODE, GENERATED_UPPER_UDF_CLASS));
+        classNameCodes.put(HIVE_ADD_ONE_UDF_CLASS, HIVE_ADD_ONE_UDF_CODE);
 
         File udfJar =
                 UserClassLoaderJarTestUtils.createJarFile(
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index 06edfa63997..29d1a7eb0ef 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -61,6 +61,36 @@ CREATE TABLE foo as select 1;
 1 row in set
 !ok
 
+# test add jar
+ADD JAR $VAR_UDF_JAR_PATH;
+[INFO] The specified jar is added into session classloader.
+!info
+
+SHOW JARS;
+$VAR_UDF_JAR_PATH
+!ok
+
+CREATE FUNCTION hive_add_one as 'HiveAddOneFunc';
+[INFO] Execute statement succeed.
+!info
+
+SELECT hive_add_one(1);
++----+-------------+
+| op |      _o__c0 |
++----+-------------+
+| +I |           2 |
++----+-------------+
+Received a total of 1 row
+!ok
+
+REMOVE JAR '$VAR_UDF_JAR_PATH';
+[INFO] The specified jar is removed from session classloader.
+!info
+
+SHOW JARS;
+Empty set
+!ok
+
 # list the configured configuration
 set;
 'execution.attached' = 'true'