You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/08/13 09:13:17 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4990] execution.savepoint.path doesn't work in %flink.conf

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new d765291  [ZEPPELIN-4990] execution.savepoint.path doesn't work in %flink.conf
d765291 is described below

commit d7652919312c6fa705f8e8452cd853d98bcd6d32
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Aug 11 22:52:45 2020 +0800

    [ZEPPELIN-4990] execution.savepoint.path doesn't work in %flink.conf
    
    ### What is this PR for?
    
    This is a trivial PR which fix the issue of `execution.savepoint.path` doesn't work in `%flink.conf`. After this PR,
     we set `execution.savepoint.path` in the following order:
    
       * 1. Use savepoint path stored in paragraph config, this is recorded by zeppelin when paragraph is canceled,
       * 2. Use checkpoint path stored in pararaph config, this is recorded by zeppelin in flink job progress poller.
       * 3. Use local property 'execution.savepoint.path' if user set it.
       * 4. Otherwise remove 'execution.savepoint.path' when user didn't specify it in %flink.conf
    
    ### What type of PR is it?
    [Bug Fix | Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4990
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3876 from zjffdu/ZEPPELIN-4990 and squashes the following commits:
    
    a78f8c00d [Jeff Zhang] [ZEPPELIN-4990]. execution.savepoint.path doesn't work in %flink.conf
    365ecf815 [Jeff Zhang] [minor] change default value of resumeFromSavepoint to false
    
    (cherry picked from commit b3f5ee9fb698f67f579f2dbf8b512806f61bb5a9)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 docs/interpreter/flink.md                          |  5 ++++
 .../apache/zeppelin/flink/FlinkInterpreter.java    |  6 ++---
 .../apache/zeppelin/flink/FlinkSqlInterrpeter.java |  9 +------
 .../zeppelin/flink/FlinkStreamSqlInterpreter.java  |  3 ---
 .../apache/zeppelin/flink/IPyFlinkInterpreter.java |  2 +-
 .../apache/zeppelin/flink/PyFlinkInterpreter.java  |  2 +-
 .../zeppelin/flink/FlinkScalaInterpreter.scala     | 29 ++++++++++++++++------
 7 files changed, 33 insertions(+), 23 deletions(-)

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 2eaea6d..577379c 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -457,6 +457,11 @@ In this section, we will list and explain all the supported local properties in
     <td>If you specify it, then when you cancel your flink job in Zeppelin, it would also do savepoint and store state in this directory. And when you resume your job, it would resume from this savepoint.</td>
   </tr>
   <tr>
+    <td>execution.savepoint.path</td>
+    <td></td>
+    <td>When you resume your job, it would resume from this savepoint path.</td>
+  </tr>
+  <tr>
     <td>resumeFromSavepoint</td>
     <td></td>
     <td>Resume flink job from savepoint if you specify savepointDir.</td>
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index d4cf347..d79fb2e 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -88,7 +88,7 @@ public class FlinkInterpreter extends Interpreter {
       Thread.currentThread().setContextClassLoader(getFlinkScalaShellLoader());
       createPlannerAgain();
       setParallelismIfNecessary(context);
-      setSavePointIfNecessary(context);
+      setSavepointIfNecessary(context);
       return innerIntp.interpret(st, context);
     } finally {
       Thread.currentThread().setContextClassLoader(originClassLoader);
@@ -181,8 +181,8 @@ public class FlinkInterpreter extends Interpreter {
     return this.innerIntp.getFlinkShims();
   }
 
-  public void setSavePointIfNecessary(InterpreterContext context) {
-    this.innerIntp.setSavePointPathIfNecessary(context);
+  public void setSavepointIfNecessary(InterpreterContext context) {
+    this.innerIntp.setSavepointPathIfNecessary(context);
   }
 
   public void setParallelismIfNecessary(InterpreterContext context) {
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index ab497c3..81a0bce 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -26,12 +26,10 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.JobListener;
-import org.apache.flink.python.PythonOptions;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
 import org.apache.zeppelin.interpreter.Interpreter;
@@ -39,17 +37,12 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.util.SqlSplitter;
-import org.jline.utils.AttributedString;
-import org.jline.utils.AttributedStringBuilder;
-import org.jline.utils.AttributedStyle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -117,7 +110,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
       Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
       flinkInterpreter.createPlannerAgain();
       flinkInterpreter.setParallelismIfNecessary(context);
-      flinkInterpreter.setSavePointIfNecessary(context);
+      flinkInterpreter.setSavepointIfNecessary(context);
       return runSqlList(st, context);
     } finally {
       Thread.currentThread().setContextClassLoader(originClassLoader);
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
index c027341..dd85272 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
@@ -56,9 +56,6 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterrpeter {
 
   @Override
   public void callInnerSelect(String sql, InterpreterContext context) throws IOException {
-    flinkInterpreter.setSavePointIfNecessary(context);
-    flinkInterpreter.setParallelismIfNecessary(context);
-
     String streamType = context.getLocalProperties().get("type");
     if (streamType == null) {
       throw new IOException("type must be specified for stream sql");
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index a875e61..9271cd5 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -89,7 +89,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
         throw new InterpreterException("Fail to initJavaThread: " +
                 result.toString());
       }
-      flinkInterpreter.setSavePointIfNecessary(context);
+      flinkInterpreter.setSavepointIfNecessary(context);
       flinkInterpreter.setParallelismIfNecessary(context);
       return super.internalInterpret(st, context);
     } finally {
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index 019e6d6..cc9ec7f 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -115,7 +115,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
                     result.toString());
           }
         }
-        flinkInterpreter.setSavePointIfNecessary(context);
+        flinkInterpreter.setSavepointIfNecessary(context);
         flinkInterpreter.setParallelismIfNecessary(context);
       }
       return super.interpret(st, context);
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 7157019..b2dfb7d 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -641,16 +641,21 @@ class FlinkScalaInterpreter(val properties: Properties) {
   }
 
   /**
-   * Use savepoint first, if no savepoint, then use checkpoint.
+   * Set execution.savepoint.path in the following order:
+   *
+   * 1. Use savepoint path stored in paragraph config, this is recorded by zeppelin when paragraph is canceled,
+   * 2. Use checkpoint path stored in pararaph config, this is recorded by zeppelin in flink job progress poller.
+   * 3. Use local property 'execution.savepoint.path' if user set it.
+   * 4. Otherwise remove 'execution.savepoint.path' when user didn't specify it in %flink.conf
    *
    * @param context
    */
-  def setSavePointPathIfNecessary(context: InterpreterContext): Unit = {
+  def setSavepointPathIfNecessary(context: InterpreterContext): Unit = {
     val savepointPath = context.getConfig.getOrDefault(JobManager.SAVEPOINT_PATH, "").toString
     val resumeFromSavepoint = context.getBooleanLocalProperty(JobManager.RESUME_FROM_SAVEPOINT, false)
     if (!StringUtils.isBlank(savepointPath) && resumeFromSavepoint){
       LOGGER.info("Resume job from savepoint , savepointPath = {}", savepointPath)
-      configuration.setString("execution.savepoint.path", savepointPath)
+      configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), savepointPath)
       return
     }
 
@@ -658,13 +663,23 @@ class FlinkScalaInterpreter(val properties: Properties) {
     val resumeFromLatestCheckpoint = context.getBooleanLocalProperty(JobManager.RESUME_FROM_CHECKPOINT, false)
     if (!StringUtils.isBlank(checkpointPath) && resumeFromLatestCheckpoint) {
       LOGGER.info("Resume job from checkpoint , checkpointPath = {}", checkpointPath)
-      configuration.setString("execution.savepoint.path", checkpointPath)
+      configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), checkpointPath)
       return
     }
 
-    // remove the SAVEPOINT_PATH which may be set by last job.
-    LOGGER.info("Start flink job without any checkpoint and savepoint, remove execution.savepoint.path")
-    configuration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH)
+    val userSavepointPath = context.getLocalProperties.getOrDefault(
+      SavepointConfigOptions.SAVEPOINT_PATH.key(), "")
+    if (!StringUtils.isBlank(userSavepointPath)) {
+      LOGGER.info("Resume job from user set savepoint , savepointPath = {}", userSavepointPath)
+      configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), checkpointPath)
+      return;
+    }
+
+    val userSettingSavepointPath = properties.getProperty(SavepointConfigOptions.SAVEPOINT_PATH.key())
+    if (StringUtils.isBlank(userSettingSavepointPath)) {
+      // remove SAVEPOINT_PATH when user didn't set it via %flink.conf
+      configuration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH)
+    }
   }
 
   def setParallelismIfNecessary(context: InterpreterContext): Unit = {