You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/29 14:51:47 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4886] [flink] Add property to start flink job from savepoint

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 5e443f5  [ZEPPELIN-4886] [flink] Add property to start flink job from savepoint
5e443f5 is described below

commit 5e443f5668998b25c3a9265dad60185655d7d1d7
Author: 1721563048@qq.com <a18652727118>
AuthorDate: Fri Jun 26 10:26:46 2020 +0800

    [ZEPPELIN-4886] [flink] Add property to start flink job from savepoint
    
    ### What is this PR for?
    Add property to start flink job from savepoint
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    [ZEPPELIN-4886] https://issues.apache.org/jira/browse/ZEPPELIN-4886
    
    ### How should this be tested?
    GUI integration tests
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    No Questions.
    
    Author: 1721563048@qq.com <a18652727118>
    
    Closes #3818 from lonelyGhostisdog/ZEPPELIN-4886 and squashes the following commits:
    
    6c84b86c8 [1721563048@qq.com] [ZEPPELIN-4886] Remove System.out and add test for resume from invalid savepointPath
    c883037fc [1721563048@qq.com] [ZEPPELIN-4886] Add test for resume from exist savepointPath
    166ac4942 [1721563048@qq.com] [ZEPPELIN-4886] Add docs for the new property
    d69bd93be [1721563048@qq.com] [ZEPPELIN-4886] Add property to start flink job from savepoint.
    
    (cherry picked from commit 933ca6baa98998a37c14a00161ec23f04d0a80bc)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 docs/interpreter/flink.md                          |  5 ++
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  8 +++
 .../flink/FlinkStreamSqlInterpreterTest.java       | 83 ++++++++++++++++++++++
 3 files changed, 96 insertions(+)

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 108d7b5..5eb7f5c 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -457,6 +457,11 @@ In this section, we will list and explain all the supported local properties in
     <td>If you specify it, then when you cancel your flink job in Zeppelin, it would also do savepoint and store state in this directory. And when you resume your job, it would resume from this savepoint.</td>
   </tr>
   <tr>
+    <td>savepointPath</td>
+    <td></td>
+    <td>If you specify it, then when you resume your job, it would resume from this savepointPath .</td>
+  </tr>
+  <tr>
     <td>runAsOne</td>
     <td>false</td>
     <td>All the insert into sql will run in a single flink job if this is true.</td>
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 1175b7b..1307108 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -647,6 +647,14 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
   def setSavePointIfNecessary(context: InterpreterContext): Unit = {
     val savepointDir = context.getLocalProperties.get("savepointDir")
+    val savepointPath = context.getLocalProperties.get("savepointPath");
+
+    if (!StringUtils.isBlank(savepointPath)){
+      LOGGER.info("savepointPath has been setup by user , savepointPath = {}", savepointPath)
+      configuration.setString("execution.savepoint.path", savepointPath)
+      return
+    }
+
     if (!StringUtils.isBlank(savepointDir)) {
       val savepointPath = z.angular(context.getParagraphId + "_savepointpath", context.getNoteId, null)
       if (savepointPath == null) {
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index 55f229a..aa0fac7 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -27,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.List;
@@ -254,6 +255,88 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
   }
 
   @Test
+  public void testResumeStreamSqlFromExistSavePointPath() throws IOException, InterpreterException, InterruptedException, TimeoutException {
+    String initStreamScalaScript = getInitStreamScript(1000);
+    InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    File savePointDir = FileUtils.getTempDirectory();
+    final Waiter waiter = new Waiter();
+    Thread thread = new Thread(() -> {
+      try {
+        InterpreterContext context = getInterpreterContext();
+        context.getLocalProperties().put("type", "update");
+        context.getLocalProperties().put("savepointDir", savePointDir.getAbsolutePath());
+        context.getLocalProperties().put("parallelism", "1");
+        context.getLocalProperties().put("maxParallelism", "10");
+        InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) as pv from " +
+                "log group by url", context);
+        waiter.assertTrue(context.out.toString().contains("url\tpv\n"));
+        waiter.assertEquals(InterpreterResult.Code.SUCCESS, result2.code());
+      } catch (Exception e) {
+        e.printStackTrace();
+        waiter.fail("Should not fail here");
+      }
+      waiter.resume();
+    });
+    thread.start();
+
+    // the streaming job will run for 20 seconds. check init_stream.scala
+    // sleep 10 seconds to make sure the job is started but not finished
+    Thread.sleep(10 * 1000);
+
+    InterpreterContext context = getInterpreterContext();
+    context.getLocalProperties().put("type", "update");
+    context.getLocalProperties().put("savepointDir", savePointDir.getAbsolutePath());
+    context.getLocalProperties().put("parallelism", "2");
+    context.getLocalProperties().put("maxParallelism", "10");
+    sqlInterpreter.cancel(context);
+    waiter.await(10 * 1000);
+
+    // get exist savepoint path from tempDirectory
+    // if dir more than 1 then get first or throw error
+    String[] allSavepointPath = savePointDir.list((dir, name) -> name.startsWith("savepoint"));
+    assertTrue(allSavepointPath.length>0);
+
+    String savepointPath = savePointDir.getAbsolutePath().concat(File.separator).concat(allSavepointPath[0]);
+
+    // resume job from exist savepointPath
+    context.getLocalProperties().put("savepointPath",savepointPath);
+    sqlInterpreter.interpret("select url, count(1) as pv from " +
+            "log group by url", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertTrue(resultMessages.toString(),
+            resultMessages.get(0).getData().contains("url\tpv\n"));
+
+  }
+
+  @Test
+  public void testResumeStreamSqlFromInvalidSavePointPath() throws IOException, InterpreterException, InterruptedException, TimeoutException {
+    String initStreamScalaScript = getInitStreamScript(1000);
+    InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    File savepointPath = FileUtils.getTempDirectory();
+    InterpreterContext context = getInterpreterContext();
+    context.getLocalProperties().put("type", "update");
+    context.getLocalProperties().put("savepointPath", savepointPath.getAbsolutePath());
+    context.getLocalProperties().put("parallelism", "1");
+    context.getLocalProperties().put("maxParallelism", "10");
+    InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) as pv from " +
+            "log group by url", context);
+
+    // due to invalid savepointPath, failed to submit job and throw exception
+    assertEquals(InterpreterResult.Code.ERROR, result2.code());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertTrue(resultMessages.toString().contains("Failed to submit job."));
+
+  }
+
+  @Test
   public void testStreamUDF() throws IOException, InterpreterException {
     String initStreamScalaScript = getInitStreamScript(100);
     InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,