You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/06 15:35:23 UTC

[flink] branch master updated (cf1a29d47a5 -> 1a094eb3619)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from cf1a29d47a5 [FLINK-27524][datastream] Introduce cache API to DataStream
     new 6d06cdaa163 [FLINK-28772][hive] Supports ADD JAR command in Hive dialect
     new 1a094eb3619 [FLINK-28772][hive] Improve exception messages for ADD JAR

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/planner/delegation/hive/HiveParser.java  | 65 +++++++++++++++-------
 .../flink/connectors/hive/HiveDialectITCase.java   | 28 ++++++++++
 .../flink/table/client/cli/CliClientITCase.java    | 11 ++++
 .../flink-sql-client/src/test/resources/sql/set.q  | 30 ++++++++++
 4 files changed, 113 insertions(+), 21 deletions(-)


[flink] 01/02: [FLINK-28772][hive] Supports ADD JAR command in Hive dialect

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6d06cdaa1633035c127483af1dc4b45e57cbc035
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Thu Jul 14 11:10:42 2022 +0800

    [FLINK-28772][hive] Supports ADD JAR command in Hive dialect
    
    This closes #20413
---
 .../table/planner/delegation/hive/HiveParser.java  | 65 +++++++++++++++-------
 .../flink/connectors/hive/HiveDialectITCase.java   | 28 ++++++++++
 .../flink/table/client/cli/CliClientITCase.java    | 11 ++++
 .../flink-sql-client/src/test/resources/sql/set.q  | 30 ++++++++++
 4 files changed, 113 insertions(+), 21 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 96d3990ce88..7ec483fdbfd 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.NopOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.AddJarOperation;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.delegation.ParserImpl;
 import org.apache.flink.table.planner.delegation.PlannerContext;
@@ -208,7 +209,9 @@ public class HiveParser extends ParserImpl {
             return super.parse(statement);
         }
 
-        Optional<Operation> nonSqlOperation = tryProcessHiveNonSqlStatement(statement);
+        Optional<Operation> nonSqlOperation =
+                tryProcessHiveNonSqlStatement(
+                        ((HiveCatalog) currentCatalog).getHiveConf(), statement);
         if (nonSqlOperation.isPresent()) {
             return Collections.singletonList(nonSqlOperation.get());
         }
@@ -231,16 +234,22 @@ public class HiveParser extends ParserImpl {
         }
     }
 
-    private Optional<Operation> tryProcessHiveNonSqlStatement(String statement) {
+    private Optional<Operation> tryProcessHiveNonSqlStatement(HiveConf hiveConf, String statement) {
         String[] commandTokens = statement.split("\\s+");
         HiveCommand hiveCommand = HiveCommand.find(commandTokens);
         if (hiveCommand != null) {
+            String cmdArgs = statement.substring(commandTokens[0].length()).trim();
+            // the command may end with ";" since it won't be removed by Flink SQL CLI,
+            // so, we need to remove ";"
+            if (cmdArgs.endsWith(";")) {
+                cmdArgs = cmdArgs.substring(0, cmdArgs.length() - 1);
+            }
             if (hiveCommand == HiveCommand.SET) {
-                return Optional.of(
-                        processSetCmd(
-                                statement, statement.substring(commandTokens[0].length()).trim()));
+                return Optional.of(processSetCmd(statement, cmdArgs));
             } else if (hiveCommand == HiveCommand.RESET) {
                 return Optional.of(super.parse(statement).get(0));
+            } else if (hiveCommand == HiveCommand.ADD) {
+                return Optional.of(processAddCmd(substituteVariables(hiveConf, cmdArgs)));
             } else {
                 throw new UnsupportedOperationException(
                         String.format("The Hive command %s is not supported.", hiveCommand));
@@ -250,29 +259,22 @@ public class HiveParser extends ParserImpl {
     }
 
     private Operation processSetCmd(String originCmd, String setCmdArgs) {
-        String nwcmd = setCmdArgs.trim();
-        // the set command may end with ";" since it won't be removed by Flink SQL CLI,
-        // so, we need to remove ";"
-        if (nwcmd.endsWith(";")) {
-            nwcmd = nwcmd.substring(0, nwcmd.length() - 1);
-        }
-
-        if (nwcmd.equals("")) {
+        if (setCmdArgs.equals("")) {
             return new HiveSetOperation();
         }
-        if (nwcmd.equals("-v")) {
+        if (setCmdArgs.equals("-v")) {
             return new HiveSetOperation(true);
         }
 
         String[] part = new String[2];
-        int eqIndex = nwcmd.indexOf('=');
-        if (nwcmd.contains("=")) {
-            if (eqIndex == nwcmd.length() - 1) { // x=
-                part[0] = nwcmd.substring(0, nwcmd.length() - 1);
+        int eqIndex = setCmdArgs.indexOf('=');
+        if (setCmdArgs.contains("=")) {
+            if (eqIndex == setCmdArgs.length() - 1) { // x=
+                part[0] = setCmdArgs.substring(0, setCmdArgs.length() - 1);
                 part[1] = "";
             } else { // x=y
-                part[0] = nwcmd.substring(0, eqIndex).trim();
-                part[1] = nwcmd.substring(eqIndex + 1).trim();
+                part[0] = setCmdArgs.substring(0, eqIndex).trim();
+                part[1] = setCmdArgs.substring(eqIndex + 1).trim();
                 if (!startWithHiveSpecialVariablePrefix(part[0])) {
                     // TODO:
                     // currently, for the command set key=value, we will fall to
@@ -294,7 +296,7 @@ public class HiveParser extends ParserImpl {
             }
             return new HiveSetOperation(part[0], part[1]);
         }
-        return new HiveSetOperation(nwcmd);
+        return new HiveSetOperation(setCmdArgs);
     }
 
     /**
@@ -306,6 +308,27 @@ public class HiveParser extends ParserImpl {
         return new VariableSubstitution(() -> hiveVariables).substitute(conf, statement);
     }
 
+    private Operation processAddCmd(String addCmdArgs) {
+        String[] tokens = addCmdArgs.split("\\s+");
+        SessionState.ResourceType resourceType = SessionState.find_resource_type(tokens[0]);
+        if (resourceType == SessionState.ResourceType.FILE) {
+            throw new UnsupportedOperationException(
+                    "ADD FILE is not supported yet. Usage: add JAR <value>");
+        } else if (resourceType == SessionState.ResourceType.ARCHIVE) {
+            throw new UnsupportedOperationException(
+                    "Add ARCHIVE is not supported yet. Usage: add JAR <value>");
+        } else if (resourceType == SessionState.ResourceType.JAR) {
+            if (tokens.length != 2) {
+                throw new UnsupportedOperationException(
+                        "Add multiple jar in one single statement is not supported yet. Usage: add JAR <value>");
+            }
+            return new AddJarOperation(tokens[1]);
+        } else {
+            throw new IllegalArgumentException(
+                    String.format("Unknown resource type: %s.", tokens[0]));
+        }
+    }
+
     private List<Operation> processCmd(
             String cmd, HiveConf hiveConf, HiveShim hiveShim, HiveCatalog hiveCatalog) {
         try {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 0285fc3a4c7..4ecc4d7ea68 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.functions.hive.HiveGenericUDTFTest;
 import org.apache.flink.table.functions.hive.util.TestSplitUDTFInitializeWithStructObjectInspector;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.command.AddJarOperation;
 import org.apache.flink.table.operations.command.ClearOperation;
 import org.apache.flink.table.operations.command.HelpOperation;
 import org.apache.flink.table.operations.command.QuitOperation;
@@ -1144,6 +1145,33 @@ public class HiveDialectITCase {
         assertThat(result.toString()).isEqualTo(String.format("[+I[%s]]", path));
     }
 
+    @Test
+    public void testAddCommand() {
+        TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) tableEnv;
+        Parser parser = tableEnvInternal.getParser();
+
+        // test add jar
+        Operation operation = parser.parse("add jar test.jar").get(0);
+        assertThat(operation).isInstanceOf(AddJarOperation.class);
+        assertThat(((AddJarOperation) operation).getPath()).isEqualTo("test.jar");
+        // test add jar with variable substitute
+        operation = parser.parse("add jar \"${hiveconf:common-key}.jar\"").get(0);
+        assertThat(operation).isInstanceOf(AddJarOperation.class);
+        assertThat(((AddJarOperation) operation).getPath()).isEqualTo("\"common-val.jar\"");
+
+        // test unsupported add command
+        assertThatThrownBy(() -> tableEnv.executeSql("add jar t1.jar t2.jar"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        "Add multiple jar in one single statement is not supported yet. Usage: add JAR <value>");
+        assertThatThrownBy(() -> tableEnv.executeSql("add File t1.txt"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("ADD FILE is not supported yet. Usage: add JAR <value>");
+        assertThatThrownBy(() -> tableEnv.executeSql("add Archive t1.tgz"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Add ARCHIVE is not supported yet. Usage: add JAR <value>");
+    }
+
     @Test
     public void testUnsupportedOperation() {
         List<String> statements =
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
index e31ee990058..60a1e76b69b 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
@@ -78,6 +78,16 @@ import static org.assertj.core.api.Assertions.assertThat;
 @RunWith(Parameterized.class)
 public class CliClientITCase extends AbstractTestBase {
 
+    private static final String HIVE_ADD_ONE_UDF_CLASS = "HiveAddOneFunc";
+    private static final String HIVE_ADD_ONE_UDF_CODE =
+            "public class "
+                    + HIVE_ADD_ONE_UDF_CLASS
+                    + " extends org.apache.hadoop.hive.ql.exec.UDF {\n"
+                    + " public int evaluate(int content) {\n"
+                    + "    return content + 1;\n"
+                    + " }"
+                    + "}\n";
+
     private static Path historyPath;
     private static Map<String, String> replaceVars;
 
@@ -109,6 +119,7 @@ public class CliClientITCase extends AbstractTestBase {
         classNameCodes.put(
                 GENERATED_UPPER_UDF_CLASS,
                 String.format(GENERATED_UPPER_UDF_CODE, GENERATED_UPPER_UDF_CLASS));
+        classNameCodes.put(HIVE_ADD_ONE_UDF_CLASS, HIVE_ADD_ONE_UDF_CODE);
 
         File udfJar =
                 UserClassLoaderJarTestUtils.createJarFile(
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index 06edfa63997..29d1a7eb0ef 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -61,6 +61,36 @@ CREATE TABLE foo as select 1;
 1 row in set
 !ok
 
+# test add jar
+ADD JAR $VAR_UDF_JAR_PATH;
+[INFO] The specified jar is added into session classloader.
+!info
+
+SHOW JARS;
+$VAR_UDF_JAR_PATH
+!ok
+
+CREATE FUNCTION hive_add_one as 'HiveAddOneFunc';
+[INFO] Execute statement succeed.
+!info
+
+SELECT hive_add_one(1);
++----+-------------+
+| op |      _o__c0 |
++----+-------------+
+| +I |           2 |
++----+-------------+
+Received a total of 1 row
+!ok
+
+REMOVE JAR '$VAR_UDF_JAR_PATH';
+[INFO] The specified jar is removed from session classloader.
+!info
+
+SHOW JARS;
+Empty set
+!ok
+
 # list the configured configuration
 set;
 'execution.attached' = 'true'


[flink] 02/02: [FLINK-28772][hive] Improve exception messages for ADD JAR

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a094eb3619e9696759eaf1a5a0dad4e6852bb12
Author: Jark Wu <ja...@apache.org>
AuthorDate: Sat Aug 6 20:28:47 2022 +0800

    [FLINK-28772][hive] Improve exception messages for ADD JAR
    
    This closes #20413
---
 .../flink/table/planner/delegation/hive/HiveParser.java    |  6 +++---
 .../apache/flink/connectors/hive/HiveDialectITCase.java    | 14 +++++++-------
 2 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 7ec483fdbfd..1fb2bf88ce0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -313,14 +313,14 @@ public class HiveParser extends ParserImpl {
         SessionState.ResourceType resourceType = SessionState.find_resource_type(tokens[0]);
         if (resourceType == SessionState.ResourceType.FILE) {
             throw new UnsupportedOperationException(
-                    "ADD FILE is not supported yet. Usage: add JAR <value>");
+                    "ADD FILE is not supported yet. Usage: ADD JAR <file_path>");
         } else if (resourceType == SessionState.ResourceType.ARCHIVE) {
             throw new UnsupportedOperationException(
-                    "Add ARCHIVE is not supported yet. Usage: add JAR <value>");
+                    "ADD ARCHIVE is not supported yet. Usage: ADD JAR <file_path>");
         } else if (resourceType == SessionState.ResourceType.JAR) {
             if (tokens.length != 2) {
                 throw new UnsupportedOperationException(
-                        "Add multiple jar in one single statement is not supported yet. Usage: add JAR <value>");
+                        "Add multiple jar in one single statement is not supported yet. Usage: ADD JAR <file_path>");
             }
             return new AddJarOperation(tokens[1]);
         } else {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 4ecc4d7ea68..0111e3e2f3d 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -1155,21 +1155,21 @@ public class HiveDialectITCase {
         assertThat(operation).isInstanceOf(AddJarOperation.class);
         assertThat(((AddJarOperation) operation).getPath()).isEqualTo("test.jar");
         // test add jar with variable substitute
-        operation = parser.parse("add jar \"${hiveconf:common-key}.jar\"").get(0);
+        operation = parser.parse("add jar ${hiveconf:common-key}.jar").get(0);
         assertThat(operation).isInstanceOf(AddJarOperation.class);
-        assertThat(((AddJarOperation) operation).getPath()).isEqualTo("\"common-val.jar\"");
+        assertThat(((AddJarOperation) operation).getPath()).isEqualTo("common-val.jar");
 
         // test unsupported add command
         assertThatThrownBy(() -> tableEnv.executeSql("add jar t1.jar t2.jar"))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessage(
-                        "Add multiple jar in one single statement is not supported yet. Usage: add JAR <value>");
-        assertThatThrownBy(() -> tableEnv.executeSql("add File t1.txt"))
+                        "Add multiple jar in one single statement is not supported yet. Usage: ADD JAR <file_path>");
+        assertThatThrownBy(() -> tableEnv.executeSql("add file t1.txt"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessage("ADD FILE is not supported yet. Usage: add JAR <value>");
-        assertThatThrownBy(() -> tableEnv.executeSql("add Archive t1.tgz"))
+                .hasMessage("ADD FILE is not supported yet. Usage: ADD JAR <file_path>");
+        assertThatThrownBy(() -> tableEnv.executeSql("add archive t1.tgz"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessage("Add ARCHIVE is not supported yet. Usage: add JAR <value>");
+                .hasMessage("ADD ARCHIVE is not supported yet. Usage: ADD JAR <file_path>");
     }
 
     @Test