You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/11/08 15:14:48 UTC

zeppelin git commit: [ZEPPELIN-1632] Add the possibility to cancel flink jobs in local mode

Repository: zeppelin
Updated Branches:
  refs/heads/master 1922a9cb5 -> 34621717d


[ZEPPELIN-1632] Add the possibility to cancel flink jobs in local mode

### What is this PR for?
Especially for long running flink streaming jobs it should be possible to cancel a job by clicking the cancel button in the zeppelin ui. This pull request contains an implementation to cancel jobs when the flink interpreter is running in local mode.

### What type of PR is it?
Improvement

### Todos
* [x] - Implement cancel method

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1632

### How should this be tested?
Execute a long running flink streaming job.
The "senv.execute()" method is blocking, therefore the paragraph remains in the state running. Then click cancel and the job will be canceled.

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Philipp <ph...@hotmail.de>

Closes #1607 from PhilippGrulich/flink_cancel_job and squashes the following commits:

9d00190 [Philipp] add the possibility to cancel flink jobs in local mode


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/34621717
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/34621717
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/34621717

Branch: refs/heads/master
Commit: 34621717df173749f677d02ace334870d06f529d
Parents: 1922a9c
Author: Philipp <ph...@hotmail.de>
Authored: Mon Nov 7 12:20:50 2016 +0100
Committer: Lee moon soo <mo...@apache.org>
Committed: Tue Nov 8 07:14:47 2016 -0800

----------------------------------------------------------------------
 .../apache/zeppelin/flink/FlinkInterpreter.java  | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/34621717/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index 5e2a60d..8b9b4ec 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -27,8 +27,12 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.*;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.scala.FlinkILoop;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.zeppelin.interpreter.Interpreter;
@@ -47,6 +51,7 @@ import scala.Option;
 import scala.Some;
 import scala.collection.JavaConversions;
 import scala.collection.immutable.Nil;
+import scala.concurrent.duration.FiniteDuration;
 import scala.runtime.AbstractFunction0;
 import scala.tools.nsc.Settings;
 import scala.tools.nsc.interpreter.IMain;
@@ -341,6 +346,20 @@ public class FlinkInterpreter extends Interpreter {
 
   @Override
   public void cancel(InterpreterContext context) {
+    if (localMode()) {
+      // In localMode we can cancel all running jobs,
+      // because the local cluster can only run one job at the time.
+      for (JobID job : this.localFlinkCluster.getCurrentlyRunningJobsJava()) {
+        logger.info("Stop job: " + job);
+        cancelJobLocalMode(job);
+      }
+    }
+  }
+
+  private void cancelJobLocalMode(JobID jobID){
+    FiniteDuration timeout = AkkaUtils.getTimeout(this.localFlinkCluster.configuration());
+    ActorGateway leader = this.localFlinkCluster.getLeaderGateway(timeout);
+    leader.ask(new JobManagerMessages.CancelJob(jobID), timeout);
   }
 
   @Override