You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by fe...@apache.org on 2017/03/08 07:22:09 UTC

zeppelin git commit: [MINOR] Use standard java API to interrupt thread

Repository: zeppelin
Updated Branches:
  refs/heads/master 142597bcf -> b41997850


[MINOR] Use standard java API to interrupt thread

### What is this PR for?
Use java `Thread.interrupt` method to stop job progress polling thread.

Standard API is:
* proper synchronized
* able to interrupt `Thread.sleep` with any interval

### What type of PR is it?
[Refactoring]

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: Igor Drozdov <ig...@epam.com>

Closes #2039 from DrIgor/refactor-progress-poller and squashes the following commits:

895787f [Igor Drozdov] Use standard java API to interrupt thread


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/b4199785
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/b4199785
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/b4199785

Branch: refs/heads/master
Commit: b41997850990c8ed99bb8d3d67c9a64371da7b84
Parents: 142597b
Author: Igor Drozdov <ig...@epam.com>
Authored: Mon Feb 20 10:04:12 2017 +0300
Committer: Felix Cheung <fe...@apache.org>
Committed: Tue Mar 7 23:22:06 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/zeppelin/scheduler/Job.java | 12 ++---
 .../zeppelin/scheduler/JobProgressPoller.java   | 49 +++++++++-----------
 2 files changed, 26 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b4199785/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index 76d90b9..0e9dbeb 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -182,22 +182,16 @@ public abstract class Job {
       this.exception = null;
       errorMessage = null;
       dateFinished = new Date();
-      progressUpdator.terminate();
-    } catch (NullPointerException e) {
-      LOGGER.error("Job failed", e);
-      progressUpdator.terminate();
-      this.exception = e;
-      setResult(e.getMessage());
-      errorMessage = getStack(e);
-      dateFinished = new Date();
     } catch (Throwable e) {
       LOGGER.error("Job failed", e);
-      progressUpdator.terminate();
       this.exception = e;
       setResult(e.getMessage());
       errorMessage = getStack(e);
       dateFinished = new Date();
     } finally {
+      if (progressUpdator != null) {
+        progressUpdator.interrupt();
+      }
       //aborted = false;
     }
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b4199785/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
index 967702a..8b8cda0 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
@@ -21,48 +21,45 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ * Polls job progress with given interval
+ *
+ * @see Job#progress()
+ * @see JobListener#onProgressUpdate(org.apache.zeppelin.scheduler.Job, int)
+ *
  * TODO(moon) : add description.
  */
 public class JobProgressPoller extends Thread {
   public static final long DEFAULT_INTERVAL_MSEC = 500;
-  Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);
+  private static final Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);
+
   private Job job;
   private long intervalMs;
-  boolean terminate = false;
 
   public JobProgressPoller(Job job, long intervalMs) {
+    super("JobProgressPoller, jobId=" + job.getId());
     this.job = job;
-    this.intervalMs = intervalMs;
+    if (intervalMs < 0) {
+      throw new IllegalArgumentException("polling interval can't be " + intervalMs);
+    }
+    this.intervalMs = intervalMs == 0 ? DEFAULT_INTERVAL_MSEC : intervalMs;
   }
 
   @Override
   public void run() {
-    if (intervalMs < 0) {
-      return;
-    } else if (intervalMs == 0) {
-      intervalMs = DEFAULT_INTERVAL_MSEC;
-    }
-
-    while (terminate == false) {
-      JobListener listener = job.getListener();
-      if (listener != null) {
-        try {
-          if (job.isRunning()) {
-            listener.onProgressUpdate(job, job.progress());
+    try {
+      while (!Thread.interrupted()) {
+        JobListener listener = job.getListener();
+        if (listener != null) {
+          try {
+            if (job.isRunning()) {
+              listener.onProgressUpdate(job, job.progress());
+            }
+          } catch (Exception e) {
+            logger.error("Can not get or update progress", e);
           }
-        } catch (Exception e) {
-          logger.error("Can not get or update progress", e);
         }
-      }
-      try {
         Thread.sleep(intervalMs);
-      } catch (InterruptedException e) {
-        logger.error("Exception in JobProgressPoller while run Thread.sleep", e);
       }
-    }
-  }
-
-  public void terminate() {
-    terminate = true;
+    } catch (InterruptedException ignored) {}
   }
 }