You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/02/18 02:53:46 UTC
[zeppelin] branch master updated: [ZEPPELIN-4614]. Dead lock in
ZeppelinServer
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 95b8081 [ZEPPELIN-4614]. Dead lock in ZeppelinServer
95b8081 is described below
commit 95b808144e25926c89725e48741c6148ee0e34ef
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Feb 14 09:56:37 2020 +0800
[ZEPPELIN-4614]. Dead lock in ZeppelinServer
### What is this PR for?
This PR is to fix the dead lock issue in ZeppelinServer. The deadlock happens in Paragraph (See jira for jstack details). Besides that I add more logging in this PR.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4614
### How should this be tested?
* I have run 2 cron jobs for more than 2 days (run it for each 5 minutes), and no deadlock happens again
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3652 from zjffdu/ZEPPELIN-4614 and squashes the following commits:
76f1d1706 [Jeff Zhang] [ZEPPELIN-4614]. Dead lock in ZeppelinServer
---
.../java/org/apache/zeppelin/scheduler/Job.java | 8 ++---
.../zeppelin/interpreter/InterpreterSetting.java | 2 +-
.../java/org/apache/zeppelin/notebook/Note.java | 22 ++++++++++++--
.../zeppelin/notebook/scheduler/CronJob.java | 35 ++++++++++------------
.../notebook/scheduler/QuartzSchedulerService.java | 19 ++++++++----
5 files changed, 54 insertions(+), 32 deletions(-)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index c36e371..66bfefa 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -178,13 +178,13 @@ public abstract class Job<T> {
}
}
- private synchronized void completeWithSuccess(T result) {
+ private void completeWithSuccess(T result) {
setResult(result);
exception = null;
errorMessage = null;
}
- private synchronized void completeWithError(Throwable error) {
+ private void completeWithError(Throwable error) {
setException(error);
errorMessage = getJobExceptionStack(error);
}
@@ -201,11 +201,11 @@ public abstract class Job<T> {
}
}
- public synchronized Throwable getException() {
+ public Throwable getException() {
return exception;
}
- protected synchronized void setException(Throwable t) {
+ protected void setException(Throwable t) {
exception = t;
}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 156bcb6..632937d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -885,7 +885,7 @@ public class InterpreterSetting {
// load dependencies
List<Dependency> deps = getDependencies();
- if (deps != null) {
+ if (deps != null && !deps.isEmpty()) {
LOGGER.info("Start to download dependencies for interpreter: " + name);
for (Dependency d : deps) {
File destDir = new File(
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 966f5ba..ba75a67 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -911,9 +911,7 @@ public class Note implements JsonSerializable {
}
public List<Paragraph> getParagraphs() {
- synchronized (paragraphs) {
- return new LinkedList<>(paragraphs);
- }
+ return this.paragraphs;
}
// TODO(zjffdu) how does this used ?
@@ -1006,6 +1004,24 @@ public class Note implements JsonSerializable {
}
/**
+ * Get InterpreterSetting used by the paragraphs of this note.
+ * @return
+ */
+ public List<InterpreterSetting> getUsedInterpreterSettings() {
+ Set<InterpreterSetting> settings = new HashSet<>();
+ for (Paragraph p : getParagraphs()) {
+ try {
+ Interpreter intp = p.getBindedInterpreter();
+ settings.add((
+ (ManagedInterpreterGroup) intp.getInterpreterGroup()).getInterpreterSetting());
+ } catch (InterpreterNotFoundException e) {
+ // ignore this
+ }
+ }
+ return new ArrayList<>(settings);
+ }
+
+ /**
* Return new note for specific user. this inserts and replaces user paragraph which doesn't
* exists in original paragraph
*
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
index bf15d8b..45133f9 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
@@ -33,28 +33,27 @@ import org.slf4j.LoggerFactory;
/** Cron task for the note. */
public class CronJob implements org.quartz.Job {
- private static final Logger logger = LoggerFactory.getLogger(CronJob.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(CronJob.class);
@Override
public void execute(JobExecutionContext context) {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
-
Notebook notebook = (Notebook) jobDataMap.get("notebook");
String noteId = jobDataMap.getString("noteId");
- logger.info("Start cron job of note: " + noteId);
+ LOGGER.info("Start cron job of note: " + noteId);
Note note = null;
try {
note = notebook.getNote(noteId);
if (note == null) {
- logger.warn("Note " + noteId + " not found");
+ LOGGER.warn("Skip cron job of note: " + noteId + ", because it is not found");
return;
}
} catch (IOException e) {
- logger.warn("Fail to get note: " + noteId, e);
+ LOGGER.warn("Skip cron job of note: " + noteId + ", because fail to get it", e);
return;
}
if (note.haveRunningOrPendingParagraphs()) {
- logger.warn(
+ LOGGER.warn(
"execution of the cron job is skipped because there is a running or pending "
+ "paragraph (note id: {})",
noteId);
@@ -62,7 +61,7 @@ public class CronJob implements org.quartz.Job {
}
if (!note.isCronSupported(notebook.getConf())) {
- logger.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server");
+ LOGGER.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server");
return;
}
@@ -70,19 +69,17 @@ public class CronJob implements org.quartz.Job {
boolean releaseResource = false;
String cronExecutingUser = null;
- try {
- Map<String, Object> config = note.getConfig();
- if (config != null) {
- if (config.containsKey("releaseresource")) {
- releaseResource = (boolean) config.get("releaseresource");
- }
- cronExecutingUser = (String) config.get("cronExecutingUser");
+ Map<String, Object> config = note.getConfig();
+ if (config != null) {
+ if (config.containsKey("releaseresource")) {
+ releaseResource = (boolean) config.get("releaseresource");
}
- } catch (ClassCastException e) {
- logger.error(e.getMessage(), e);
+ cronExecutingUser = (String) config.get("cronExecutingUser");
}
+
if (releaseResource) {
- for (InterpreterSetting setting : note.getBindedInterpreterSettings()) {
+ LOGGER.info("Releasing interpreters used by this note: " + noteId);
+ for (InterpreterSetting setting : note.getUsedInterpreterSettings()) {
try {
notebook
.getInterpreterSettingManager()
@@ -91,7 +88,7 @@ public class CronJob implements org.quartz.Job {
noteId,
cronExecutingUser != null ? cronExecutingUser : "anonymous");
} catch (InterpreterException e) {
- logger.error("Fail to restart interpreter: " + setting.getId(), e);
+ LOGGER.error("Fail to restart interpreter: " + setting.getId(), e);
}
}
}
@@ -111,7 +108,7 @@ public class CronJob implements org.quartz.Job {
try {
note.runAll(authenticationInfo, true);
} catch (Exception e) {
- logger.warn("Fail to run note", e);
+ LOGGER.warn("Fail to run note: " + note.getName(), e);
}
}
}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java
index bceeb9c..cf1c0a3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java
@@ -76,24 +76,32 @@ public class QuartzSchedulerService implements SchedulerService {
try {
note = notebook.getNote(noteId);
} catch (IOException e) {
- LOGGER.warn("Fail to get note: " + noteId, e);
+ LOGGER.warn("Skip refresh cron of note: " + noteId + " because fail to get it", e);
return;
}
- if (note == null || note.isTrash()) {
+ if (note == null) {
+ LOGGER.warn("Skip refresh cron of note: " + noteId + " because there's no such note");
return;
}
+ if (note.isTrash()) {
+ LOGGER.warn("Skip refresh cron of note: " + noteId + " because it is in trash");
+ return;
+ }
+
Map<String, Object> config = note.getConfig();
if (config == null) {
+ LOGGER.warn("Skip refresh cron of note: " + noteId + " because its config is empty.");
return;
}
if (!note.isCronSupported(zeppelinConfiguration)) {
- LOGGER.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server");
+ LOGGER.warn("Skip refresh cron of note " + noteId + " because its cron is not enabled.");
return;
}
String cronExpr = (String) note.getConfig().get("cron");
if (cronExpr == null || cronExpr.trim().length() == 0) {
+ LOGGER.warn("Skip refresh cron of note " + noteId + " because its cron expression is empty.");
return;
}
@@ -122,16 +130,17 @@ public class QuartzSchedulerService implements SchedulerService {
.forJob(noteId, "note")
.build();
} catch (Exception e) {
- LOGGER.error("Error", e);
+ LOGGER.error("Fail to create cron trigger for note: " + note.getName(), e);
info.put("cron", e.getMessage());
}
try {
if (trigger != null) {
+ LOGGER.info("Trigger cron for note: " + note.getName() + ", with cron expression: " + cronExpr);
scheduler.scheduleJob(newJob, trigger);
}
} catch (SchedulerException e) {
- LOGGER.error("Error", e);
+ LOGGER.error("Fail to schedule cron job for note: " + note.getName(), e);
info.put("cron", "Scheduler Exception");
}
}