You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/10 09:14:48 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4865]. Allow
specify jobName as paragraph local properties
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new d2b6618 [ZEPPELIN-4865]. Allow specify jobName as paragraph local properties
d2b6618 is described below
commit d2b66188232fd1481b3c1196adf1defe847d07c6
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat Jun 6 23:35:26 2020 +0800
[ZEPPELIN-4865]. Allow specify jobName as paragraph local properties
### What is this PR for?
Minor PR which allow user to specify job name as paragraph local properties, by default it is the sql statement if user don't specify it.
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-4865
### How should this be tested?
* CI pass and manually tested.
### Screenshots (if appropriate)
![image](https://user-images.githubusercontent.com/164491/84017337-15e73180-a9b1-11ea-94de-baf605d6a422.png)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3792 from zjffdu/ZEPPELIN-4865 and squashes the following commits:
7c6e2f5c9 [Jeff Zhang] [ZEPPELIN-4865]. Allow specify jobName as paragraph local properties
(cherry picked from commit 6bea350b804586ec5365603da27e62265986608e)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../src/main/java/org/apache/zeppelin/flink/FlinkShims.java | 2 +-
.../src/main/java/org/apache/zeppelin/flink/Flink110Shims.java | 4 ++--
.../src/main/java/org/apache/zeppelin/flink/Flink111Shims.java | 2 +-
.../main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java | 6 ++++--
.../java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java | 5 +++--
5 files changed, 11 insertions(+), 8 deletions(-)
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index ef5f0a0..274bf2c 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -89,7 +89,7 @@ public abstract class FlinkShims {
public abstract void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception;
- public abstract boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception;
+ public abstract boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception;
public abstract boolean rowEquals(Object row1, Object row2);
diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index dec3560..f6d506a 100644
--- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -109,8 +109,8 @@ public class Flink110Shims extends FlinkShims {
}
@Override
- public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception {
- ((TableEnvironment) tblEnv).execute(sql);
+ public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception {
+ ((TableEnvironment) tblEnv).execute(jobName);
return true;
}
diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index ea11ced..2480c69 100644
--- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -116,7 +116,7 @@ public class Flink111Shims extends FlinkShims {
}
@Override
- public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception {
+ public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception {
JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
while(!jobClient.getJobStatus().get().isTerminalState()) {
LOGGER.debug("Wait for job to finish");
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index 1e8e803..f2d31b6 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -223,7 +223,8 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
if (runAsOne) {
try {
lock.lock();
- if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(st, this.tbenv, context)) {
+ String jobName = context.getStringLocalProperty("jobName", st);
+ if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(jobName, this.tbenv, context)) {
context.out.write("Insertion successfully.\n");
}
} catch (Exception e) {
@@ -532,7 +533,8 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
if (!runAsOne) {
this.tbenv.sqlUpdate(sql);
- this.tbenv.execute(sql);
+ String jobName = context.getStringLocalProperty("jobName", sql);
+ this.tbenv.execute(jobName);
context.out.write("Insertion successfully.\n");
} else {
flinkInterpreter.getFlinkShims().addInsertStatement(sql, this.tbenv, context);
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index 2d98ef7..a8728d3 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -157,8 +157,9 @@ public abstract class AbstractStreamSqlJob {
retrievalThread.start();
LOGGER.info("Run job: " + tableName + ", parallelism: " + parallelism);
- stenv.execute(tableName);
- LOGGER.info("Flink Job is finished, tableName: " + tableName);
+ String jobName = context.getStringLocalProperty("jobName", tableName);
+ stenv.execute(jobName);
+ LOGGER.info("Flink Job is finished, jobName: " + jobName);
// wait for retrieve thread consume all data
LOGGER.info("Waiting for retrieve thread to be done");
retrievalThread.join();