You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/06 15:35:24 UTC
[flink] 01/02: [FLINK-28772][hive] Supports ADD JAR command in Hive dialect
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6d06cdaa1633035c127483af1dc4b45e57cbc035
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Thu Jul 14 11:10:42 2022 +0800
[FLINK-28772][hive] Supports ADD JAR command in Hive dialect
This closes #20413
---
.../table/planner/delegation/hive/HiveParser.java | 65 +++++++++++++++-------
.../flink/connectors/hive/HiveDialectITCase.java | 28 ++++++++++
.../flink/table/client/cli/CliClientITCase.java | 11 ++++
.../flink-sql-client/src/test/resources/sql/set.q | 30 ++++++++++
4 files changed, 113 insertions(+), 21 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 96d3990ce88..7ec483fdbfd 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.NopOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.delegation.PlannerContext;
@@ -208,7 +209,9 @@ public class HiveParser extends ParserImpl {
return super.parse(statement);
}
- Optional<Operation> nonSqlOperation = tryProcessHiveNonSqlStatement(statement);
+ Optional<Operation> nonSqlOperation =
+ tryProcessHiveNonSqlStatement(
+ ((HiveCatalog) currentCatalog).getHiveConf(), statement);
if (nonSqlOperation.isPresent()) {
return Collections.singletonList(nonSqlOperation.get());
}
@@ -231,16 +234,22 @@ public class HiveParser extends ParserImpl {
}
}
- private Optional<Operation> tryProcessHiveNonSqlStatement(String statement) {
+ private Optional<Operation> tryProcessHiveNonSqlStatement(HiveConf hiveConf, String statement) {
String[] commandTokens = statement.split("\\s+");
HiveCommand hiveCommand = HiveCommand.find(commandTokens);
if (hiveCommand != null) {
+ String cmdArgs = statement.substring(commandTokens[0].length()).trim();
+ // the command may end with ";" since it won't be removed by Flink SQL CLI,
+ // so, we need to remove ";"
+ if (cmdArgs.endsWith(";")) {
+ cmdArgs = cmdArgs.substring(0, cmdArgs.length() - 1);
+ }
if (hiveCommand == HiveCommand.SET) {
- return Optional.of(
- processSetCmd(
- statement, statement.substring(commandTokens[0].length()).trim()));
+ return Optional.of(processSetCmd(statement, cmdArgs));
} else if (hiveCommand == HiveCommand.RESET) {
return Optional.of(super.parse(statement).get(0));
+ } else if (hiveCommand == HiveCommand.ADD) {
+ return Optional.of(processAddCmd(substituteVariables(hiveConf, cmdArgs)));
} else {
throw new UnsupportedOperationException(
String.format("The Hive command %s is not supported.", hiveCommand));
@@ -250,29 +259,22 @@ public class HiveParser extends ParserImpl {
}
private Operation processSetCmd(String originCmd, String setCmdArgs) {
- String nwcmd = setCmdArgs.trim();
- // the set command may end with ";" since it won't be removed by Flink SQL CLI,
- // so, we need to remove ";"
- if (nwcmd.endsWith(";")) {
- nwcmd = nwcmd.substring(0, nwcmd.length() - 1);
- }
-
- if (nwcmd.equals("")) {
+ if (setCmdArgs.equals("")) {
return new HiveSetOperation();
}
- if (nwcmd.equals("-v")) {
+ if (setCmdArgs.equals("-v")) {
return new HiveSetOperation(true);
}
String[] part = new String[2];
- int eqIndex = nwcmd.indexOf('=');
- if (nwcmd.contains("=")) {
- if (eqIndex == nwcmd.length() - 1) { // x=
- part[0] = nwcmd.substring(0, nwcmd.length() - 1);
+ int eqIndex = setCmdArgs.indexOf('=');
+ if (setCmdArgs.contains("=")) {
+ if (eqIndex == setCmdArgs.length() - 1) { // x=
+ part[0] = setCmdArgs.substring(0, setCmdArgs.length() - 1);
part[1] = "";
} else { // x=y
- part[0] = nwcmd.substring(0, eqIndex).trim();
- part[1] = nwcmd.substring(eqIndex + 1).trim();
+ part[0] = setCmdArgs.substring(0, eqIndex).trim();
+ part[1] = setCmdArgs.substring(eqIndex + 1).trim();
if (!startWithHiveSpecialVariablePrefix(part[0])) {
// TODO:
// currently, for the command set key=value, we will fall to
@@ -294,7 +296,7 @@ public class HiveParser extends ParserImpl {
}
return new HiveSetOperation(part[0], part[1]);
}
- return new HiveSetOperation(nwcmd);
+ return new HiveSetOperation(setCmdArgs);
}
/**
@@ -306,6 +308,27 @@ public class HiveParser extends ParserImpl {
return new VariableSubstitution(() -> hiveVariables).substitute(conf, statement);
}
+ private Operation processAddCmd(String addCmdArgs) {
+ String[] tokens = addCmdArgs.split("\\s+");
+ SessionState.ResourceType resourceType = SessionState.find_resource_type(tokens[0]);
+ if (resourceType == SessionState.ResourceType.FILE) {
+ throw new UnsupportedOperationException(
+ "ADD FILE is not supported yet. Usage: add JAR <value>");
+ } else if (resourceType == SessionState.ResourceType.ARCHIVE) {
+ throw new UnsupportedOperationException(
+ "Add ARCHIVE is not supported yet. Usage: add JAR <value>");
+ } else if (resourceType == SessionState.ResourceType.JAR) {
+ if (tokens.length != 2) {
+ throw new UnsupportedOperationException(
+ "Add multiple jar in one single statement is not supported yet. Usage: add JAR <value>");
+ }
+ return new AddJarOperation(tokens[1]);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unknown resource type: %s.", tokens[0]));
+ }
+ }
+
private List<Operation> processCmd(
String cmd, HiveConf hiveConf, HiveShim hiveShim, HiveCatalog hiveCatalog) {
try {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 0285fc3a4c7..4ecc4d7ea68 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.functions.hive.HiveGenericUDTFTest;
import org.apache.flink.table.functions.hive.util.TestSplitUDTFInitializeWithStructObjectInspector;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.ClearOperation;
import org.apache.flink.table.operations.command.HelpOperation;
import org.apache.flink.table.operations.command.QuitOperation;
@@ -1144,6 +1145,33 @@ public class HiveDialectITCase {
assertThat(result.toString()).isEqualTo(String.format("[+I[%s]]", path));
}
+ @Test
+ public void testAddCommand() {
+ TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) tableEnv;
+ Parser parser = tableEnvInternal.getParser();
+
+ // test add jar
+ Operation operation = parser.parse("add jar test.jar").get(0);
+ assertThat(operation).isInstanceOf(AddJarOperation.class);
+ assertThat(((AddJarOperation) operation).getPath()).isEqualTo("test.jar");
+ // test add jar with variable substitute
+ operation = parser.parse("add jar \"${hiveconf:common-key}.jar\"").get(0);
+ assertThat(operation).isInstanceOf(AddJarOperation.class);
+ assertThat(((AddJarOperation) operation).getPath()).isEqualTo("\"common-val.jar\"");
+
+ // test unsupported add command
+ assertThatThrownBy(() -> tableEnv.executeSql("add jar t1.jar t2.jar"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ "Add multiple jar in one single statement is not supported yet. Usage: add JAR <value>");
+ assertThatThrownBy(() -> tableEnv.executeSql("add File t1.txt"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("ADD FILE is not supported yet. Usage: add JAR <value>");
+ assertThatThrownBy(() -> tableEnv.executeSql("add Archive t1.tgz"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Add ARCHIVE is not supported yet. Usage: add JAR <value>");
+ }
+
@Test
public void testUnsupportedOperation() {
List<String> statements =
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
index e31ee990058..60a1e76b69b 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
@@ -78,6 +78,16 @@ import static org.assertj.core.api.Assertions.assertThat;
@RunWith(Parameterized.class)
public class CliClientITCase extends AbstractTestBase {
+ private static final String HIVE_ADD_ONE_UDF_CLASS = "HiveAddOneFunc";
+ private static final String HIVE_ADD_ONE_UDF_CODE =
+ "public class "
+ + HIVE_ADD_ONE_UDF_CLASS
+ + " extends org.apache.hadoop.hive.ql.exec.UDF {\n"
+ + " public int evaluate(int content) {\n"
+ + " return content + 1;\n"
+ + " }"
+ + "}\n";
+
private static Path historyPath;
private static Map<String, String> replaceVars;
@@ -109,6 +119,7 @@ public class CliClientITCase extends AbstractTestBase {
classNameCodes.put(
GENERATED_UPPER_UDF_CLASS,
String.format(GENERATED_UPPER_UDF_CODE, GENERATED_UPPER_UDF_CLASS));
+ classNameCodes.put(HIVE_ADD_ONE_UDF_CLASS, HIVE_ADD_ONE_UDF_CODE);
File udfJar =
UserClassLoaderJarTestUtils.createJarFile(
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index 06edfa63997..29d1a7eb0ef 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -61,6 +61,36 @@ CREATE TABLE foo as select 1;
1 row in set
!ok
+# test add jar
+ADD JAR $VAR_UDF_JAR_PATH;
+[INFO] The specified jar is added into session classloader.
+!info
+
+SHOW JARS;
+$VAR_UDF_JAR_PATH
+!ok
+
+CREATE FUNCTION hive_add_one as 'HiveAddOneFunc';
+[INFO] Execute statement succeed.
+!info
+
+SELECT hive_add_one(1);
++----+-------------+
+| op | _o__c0 |
++----+-------------+
+| +I | 2 |
++----+-------------+
+Received a total of 1 row
+!ok
+
+REMOVE JAR '$VAR_UDF_JAR_PATH';
+[INFO] The specified jar is removed from session classloader.
+!info
+
+SHOW JARS;
+Empty set
+!ok
+
# list the configured configuration
set;
'execution.attached' = 'true'