You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/10 09:14:48 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4865]. Allow specify jobName as paragraph local properties

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new d2b6618  [ZEPPELIN-4865]. Allow specify jobName as paragraph local properties
d2b6618 is described below

commit d2b66188232fd1481b3c1196adf1defe847d07c6
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat Jun 6 23:35:26 2020 +0800

    [ZEPPELIN-4865]. Allow specify jobName as paragraph local properties
    
    ### What is this PR for?
    
    Minor PR which allow user to specify job name as paragraph local properties, by default it is the sql statement if user don't specify it.
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    
    https://issues.apache.org/jira/browse/ZEPPELIN-4865
    
    ### How should this be tested?
    * CI pass and manually tested.
    
    ### Screenshots (if appropriate)
    ![image](https://user-images.githubusercontent.com/164491/84017337-15e73180-a9b1-11ea-94de-baf605d6a422.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3792 from zjffdu/ZEPPELIN-4865 and squashes the following commits:
    
    7c6e2f5c9 [Jeff Zhang] [ZEPPELIN-4865]. Allow specify jobName as paragraph local properties
    
    (cherry picked from commit 6bea350b804586ec5365603da27e62265986608e)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../src/main/java/org/apache/zeppelin/flink/FlinkShims.java         | 2 +-
 .../src/main/java/org/apache/zeppelin/flink/Flink110Shims.java      | 4 ++--
 .../src/main/java/org/apache/zeppelin/flink/Flink111Shims.java      | 2 +-
 .../main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java    | 6 ++++--
 .../java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java    | 5 +++--
 5 files changed, 11 insertions(+), 8 deletions(-)

diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index ef5f0a0..274bf2c 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -89,7 +89,7 @@ public abstract class FlinkShims {
 
   public abstract void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception;
 
-  public abstract boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception;
+  public abstract boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception;
 
   public abstract boolean rowEquals(Object row1, Object row2);
 
diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index dec3560..f6d506a 100644
--- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -109,8 +109,8 @@ public class Flink110Shims extends FlinkShims {
   }
 
   @Override
-  public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception {
-    ((TableEnvironment) tblEnv).execute(sql);
+  public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception {
+    ((TableEnvironment) tblEnv).execute(jobName);
     return true;
   }
 
diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index ea11ced..2480c69 100644
--- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -116,7 +116,7 @@ public class Flink111Shims extends FlinkShims {
   }
 
   @Override
-  public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception {
+  public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception {
     JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
     while(!jobClient.getJobStatus().get().isTerminalState()) {
       LOGGER.debug("Wait for job to finish");
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index 1e8e803..f2d31b6 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -223,7 +223,8 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
       if (runAsOne) {
         try {
           lock.lock();
-          if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(st, this.tbenv, context)) {
+          String jobName = context.getStringLocalProperty("jobName", st);
+          if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(jobName, this.tbenv, context)) {
             context.out.write("Insertion successfully.\n");
           }
         } catch (Exception e) {
@@ -532,7 +533,8 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
        boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
        if (!runAsOne) {
          this.tbenv.sqlUpdate(sql);
-         this.tbenv.execute(sql);
+         String jobName = context.getStringLocalProperty("jobName", sql);
+         this.tbenv.execute(jobName);
          context.out.write("Insertion successfully.\n");
        } else {
          flinkInterpreter.getFlinkShims().addInsertStatement(sql, this.tbenv, context);
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index 2d98ef7..a8728d3 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -157,8 +157,9 @@ public abstract class AbstractStreamSqlJob {
       retrievalThread.start();
 
       LOGGER.info("Run job: " + tableName + ", parallelism: " + parallelism);
-      stenv.execute(tableName);
-      LOGGER.info("Flink Job is finished, tableName: " + tableName);
+      String jobName = context.getStringLocalProperty("jobName", tableName);
+      stenv.execute(jobName);
+      LOGGER.info("Flink Job is finished, jobName: " + jobName);
       // wait for retrieve thread consume all data
       LOGGER.info("Waiting for retrieve thread to be done");
       retrievalThread.join();