You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by fe...@apache.org on 2017/03/08 07:22:09 UTC
zeppelin git commit: [MINOR] Use standard java API to interrupt thread
Repository: zeppelin
Updated Branches:
refs/heads/master 142597bcf -> b41997850
[MINOR] Use standard java API to interrupt thread
### What is this PR for?
Use java `Thread.interrupt` method to stop job progress polling thread.
Standard API is:
* proper synchronized
* able to interrupt `Thread.sleep` with any interval
### What type of PR is it?
[Refactoring]
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: Igor Drozdov <ig...@epam.com>
Closes #2039 from DrIgor/refactor-progress-poller and squashes the following commits:
895787f [Igor Drozdov] Use standard java API to interrupt thread
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/b4199785
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/b4199785
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/b4199785
Branch: refs/heads/master
Commit: b41997850990c8ed99bb8d3d67c9a64371da7b84
Parents: 142597b
Author: Igor Drozdov <ig...@epam.com>
Authored: Mon Feb 20 10:04:12 2017 +0300
Committer: Felix Cheung <fe...@apache.org>
Committed: Tue Mar 7 23:22:06 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/zeppelin/scheduler/Job.java | 12 ++---
.../zeppelin/scheduler/JobProgressPoller.java | 49 +++++++++-----------
2 files changed, 26 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b4199785/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index 76d90b9..0e9dbeb 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -182,22 +182,16 @@ public abstract class Job {
this.exception = null;
errorMessage = null;
dateFinished = new Date();
- progressUpdator.terminate();
- } catch (NullPointerException e) {
- LOGGER.error("Job failed", e);
- progressUpdator.terminate();
- this.exception = e;
- setResult(e.getMessage());
- errorMessage = getStack(e);
- dateFinished = new Date();
} catch (Throwable e) {
LOGGER.error("Job failed", e);
- progressUpdator.terminate();
this.exception = e;
setResult(e.getMessage());
errorMessage = getStack(e);
dateFinished = new Date();
} finally {
+ if (progressUpdator != null) {
+ progressUpdator.interrupt();
+ }
//aborted = false;
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b4199785/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
index 967702a..8b8cda0 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/JobProgressPoller.java
@@ -21,48 +21,45 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
+ * Polls job progress with given interval
+ *
+ * @see Job#progress()
+ * @see JobListener#onProgressUpdate(org.apache.zeppelin.scheduler.Job, int)
+ *
* TODO(moon) : add description.
*/
public class JobProgressPoller extends Thread {
public static final long DEFAULT_INTERVAL_MSEC = 500;
- Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);
+ private static final Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);
+
private Job job;
private long intervalMs;
- boolean terminate = false;
public JobProgressPoller(Job job, long intervalMs) {
+ super("JobProgressPoller, jobId=" + job.getId());
this.job = job;
- this.intervalMs = intervalMs;
+ if (intervalMs < 0) {
+ throw new IllegalArgumentException("polling interval can't be " + intervalMs);
+ }
+ this.intervalMs = intervalMs == 0 ? DEFAULT_INTERVAL_MSEC : intervalMs;
}
@Override
public void run() {
- if (intervalMs < 0) {
- return;
- } else if (intervalMs == 0) {
- intervalMs = DEFAULT_INTERVAL_MSEC;
- }
-
- while (terminate == false) {
- JobListener listener = job.getListener();
- if (listener != null) {
- try {
- if (job.isRunning()) {
- listener.onProgressUpdate(job, job.progress());
+ try {
+ while (!Thread.interrupted()) {
+ JobListener listener = job.getListener();
+ if (listener != null) {
+ try {
+ if (job.isRunning()) {
+ listener.onProgressUpdate(job, job.progress());
+ }
+ } catch (Exception e) {
+ logger.error("Can not get or update progress", e);
}
- } catch (Exception e) {
- logger.error("Can not get or update progress", e);
}
- }
- try {
Thread.sleep(intervalMs);
- } catch (InterruptedException e) {
- logger.error("Exception in JobProgressPoller while run Thread.sleep", e);
}
- }
- }
-
- public void terminate() {
- terminate = true;
+ } catch (InterruptedException ignored) {}
}
}