You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/11/04 02:03:18 UTC
[zeppelin] branch master updated: [ZEPPELIN-5067]. Add ZSession
cleanup thread
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 29bbd00 [ZEPPELIN-5067]. Add ZSession cleanup thread
29bbd00 is described below
commit 29bbd002af33369d043e5263a8dd86a9b562806e
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Oct 30 11:02:30 2020 +0800
[ZEPPELIN-5067]. Add ZSession cleanup thread
### What is this PR for?
This PR will create a scheduled task to check session state, if the session is stopped, it would clean the associated session resources (e.g. remove session note).
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5067
### How should this be tested?
* Unit test is added
https://travis-ci.com/github/zjffdu/zeppelin/builds/196346111
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3960 from zjffdu/ZEPPELIN-5067 and squashes the following commits:
9508d2ce3 [Jeff Zhang] [ZEPPELIN-5067]. Add ZSession cleanup thread
90bec615f [Jeff Zhang] save
---
.../integration/ZSessionIntegrationTest.java | 45 ++++++++
.../integration/ZeppelinClientIntegrationTest.java | 8 +-
.../zeppelin/conf/ZeppelinConfiguration.java | 3 +-
.../zeppelin/interpreter/util/ProcessLauncher.java | 2 +-
.../org/apache/zeppelin/rest/SessionRestApi.java | 51 ++++++---
.../SessionManagerService.java} | 115 ++++++++++++++++-----
.../org/apache/zeppelin/notebook/Notebook.java | 5 +
7 files changed, 187 insertions(+), 42 deletions(-)
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
index 2ec61bd..f086f78 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
@@ -25,6 +25,7 @@ import org.apache.zeppelin.client.Status;
import org.apache.zeppelin.client.ZSession;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.integration.DownloadUtils;
+import org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.rest.AbstractTestRestApi;
@@ -42,6 +43,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class ZSessionIntegrationTest extends AbstractTestRestApi {
@@ -57,8 +59,14 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HELIUM_REGISTRY.getVarName(),
"helium");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ALLOWED_ORIGINS.getVarName(), "*");
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_SESSION_CHECK_INTERVAL.getVarName(), "5000");
AbstractTestRestApi.startUp(ZSessionIntegrationTest.class.getSimpleName());
+ ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
+ zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS.getVarName(), TimeoutLifecycleManager.class.getName());
+ zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL.getVarName(), "5000");
+ zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD.getVarName(), "10000");
+
notebook = TestUtils.getInstance(Notebook.class);
sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
flinkHome = DownloadUtils.downloadFlink("1.10.1");
@@ -423,6 +431,43 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi {
}
}
+ @Test
+ public void testZSessionCleanup() throws Exception {
+ Map<String, String> intpProperties = new HashMap<>();
+
+ ZSession session = ZSession.builder()
+ .setClientConfig(clientConfig)
+ .setInterpreter("python")
+ .setIntpProperties(intpProperties)
+ .build();
+
+ try {
+ session.start(new SimpleMessageHandler());
+ assertNull(session.getWeburl());
+ assertNotNull(session.getNoteId());
+
+ assertTrue(notebook.getAllNotes().size() > 0);
+
+ Thread.sleep(30 * 1000);
+ assertEquals(0, notebook.getAllNotes().size());
+
+ try {
+ session.execute("1/0");
+ fail("Should fail to execute code after session is stopped");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ } finally {
+ try {
+ session.stop();
+ fail("Should fail to stop session after it is stopped");
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertTrue(e.getMessage().contains("No such session"));
+ }
+ }
+ }
+
//@Test
public void testZSession_Jdbc() throws Exception {
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientIntegrationTest.java
index 0e8a3c5..96ff276 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinClientIntegrationTest.java
@@ -419,6 +419,12 @@ public class ZeppelinClientIntegrationTest extends AbstractTestRestApi {
SessionInfo sessionInfo = zeppelinClient.getSession("invalid_session");
assertNull(sessionInfo);
- zeppelinClient.stopSession("invalid_session");
+ try {
+ zeppelinClient.stopSession("invalid_session");
+ fail("Should fail to stop session after it is stopped");
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertTrue(e.getMessage().contains("No such session"));
+ }
}
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 6c01070..342020d 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -1111,7 +1111,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_SEARCH_USE_DISK("zeppelin.search.use.disk", true),
ZEPPELIN_SEARCH_INDEX_PATH("zeppelin.search.index.path", "/tmp/zeppelin-index"),
ZEPPELIN_JOBMANAGER_ENABLE("zeppelin.jobmanager.enable", false),
- ZEPPELIN_SPARK_ONLY_YARN_CLUSTER("zeppelin.spark.only_yarn_cluster", false);
+ ZEPPELIN_SPARK_ONLY_YARN_CLUSTER("zeppelin.spark.only_yarn_cluster", false),
+ ZEPPELIN_SESSION_CHECK_INTERVAL("zeppelin.session.check_interval", 60 * 10 * 1000);
private String varName;
@SuppressWarnings("rawtypes")
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
index ff07efe..9f54e2c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
@@ -53,7 +53,7 @@ public abstract class ProcessLauncher implements ExecuteResultHandler {
private ExecuteWatchdog watchdog;
private ProcessLogOutputStream processOutput;
protected String errorMessage = null;
- protected State state = State.NEW;
+ protected volatile State state = State.NEW;
private boolean launchTimeout = false;
public ProcessLauncher(CommandLine commandLine,
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionRestApi.java
index 33eb5b9..5306d68 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionRestApi.java
@@ -18,13 +18,13 @@
package org.apache.zeppelin.rest;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.common.SessionInfo;
import org.apache.zeppelin.rest.exception.SessionNoteFoundException;
import org.apache.zeppelin.server.JsonResponse;
+import org.apache.zeppelin.service.SessionManagerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +41,7 @@ import javax.ws.rs.core.Response;
import java.util.List;
/**
- * Rest api endpoint for the ZSession.
+ * Rest api endpoint for ZSession operations.
*/
@Path("/session")
@Produces("application/json")
@@ -50,44 +50,69 @@ public class SessionRestApi {
private static final Logger LOGGER = LoggerFactory.getLogger(SessionRestApi.class);
- private SessionManager sessionManager;
+ private SessionManagerService sessionManagerService;
@Inject
public SessionRestApi(Notebook notebook, InterpreterSettingManager interpreterSettingManager) {
- this.sessionManager = new SessionManager(notebook, interpreterSettingManager);
+ this.sessionManagerService = new SessionManagerService(notebook, interpreterSettingManager);
}
+ /**
+ * List all sessions when interpreter is not provided, otherwise only list all the sessions
+ * of this provided interpreter.
+ *
+ * @param interpreter
+ * @return
+ * @throws Exception
+ */
@GET
@Path("/")
public Response listSessions(@QueryParam("interpreter") String interpreter) throws Exception {
if (StringUtils.isBlank(interpreter)) {
- LOGGER.info("List all sessions");
+ LOGGER.info("List all sessions of all interpreters");
} else {
LOGGER.info("List all sessions for interpreter: " + interpreter);
}
List<SessionInfo> sessionList = null;
if (StringUtils.isBlank(interpreter)) {
- sessionList = sessionManager.getAllSessions();
+ sessionList = sessionManagerService.listSessions();
} else {
- sessionList = sessionManager.getAllSessions(interpreter);
+ sessionList = sessionManagerService.listSessions(interpreter);
}
return new JsonResponse<>(Response.Status.OK, sessionList).build();
}
+ /**
+ * Create a session for this provided interpreter.
+ *
+ * @param interpreter
+ * @return
+ * @throws Exception
+ */
@POST
@Path("/")
public Response createSession(@QueryParam("interpreter") String interpreter) throws Exception {
LOGGER.info("Create new session for interpreter: {}", interpreter);
- SessionInfo sessionInfo = sessionManager.createSession(interpreter);
+ SessionInfo sessionInfo = sessionManagerService.createSession(interpreter);
return new JsonResponse<>(Response.Status.OK, sessionInfo).build();
}
+ /**
+ * Stop the session of the provided sessionId.
+ *
+ * @param sessionId
+ * @return
+ */
@DELETE
@Path("{sessionId}")
public Response stopSession(@PathParam("sessionId") String sessionId) {
LOGGER.info("Stop session: {}", sessionId);
- sessionManager.removeSession(sessionId);
- return new JsonResponse<>(Response.Status.OK, sessionId).build();
+ try {
+ sessionManagerService.stopSession(sessionId);
+ return new JsonResponse<>(Response.Status.OK, sessionId).build();
+ } catch (Exception e) {
+ return new JsonResponse<>(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage()).build();
+ }
}
/**
@@ -97,11 +122,11 @@ public class SessionRestApi {
@Path("{sessionId}")
@ZeppelinApi
public Response getSession(@PathParam("sessionId") String sessionId) throws Exception {
- SessionInfo session = sessionManager.getSession(sessionId);
- if (session == null) {
+ SessionInfo sessionInfo = sessionManagerService.getSessionInfo(sessionId);
+ if (sessionInfo == null) {
throw new SessionNoteFoundException(sessionId);
} else {
- return new JsonResponse<>(Response.Status.OK, "", session).build();
+ return new JsonResponse<>(Response.Status.OK, "", sessionInfo).build();
}
}
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/SessionManagerService.java
similarity index 51%
rename from zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionManager.java
rename to zeppelin-server/src/main/java/org/apache/zeppelin/service/SessionManagerService.java
index 16cafa9..479cf19 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/SessionManager.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/SessionManagerService.java
@@ -16,44 +16,72 @@
*/
-package org.apache.zeppelin.rest;
+package org.apache.zeppelin.service;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.notebook.Note;
-import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.common.SessionInfo;
+import org.apache.zeppelin.scheduler.ExecutorFactory;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
*
- * Backend manager of ZSessions
+ * Service class of SessionManager.
*/
-public class SessionManager {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
+public class SessionManagerService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SessionManagerService.class);
private static final int RETRY = 3;
- private Map<String, SessionInfo> sessions = new HashMap<>();
+
+ private Map<String, SessionInfo> sessions = new ConcurrentHashMap<>();
private InterpreterSettingManager interpreterSettingManager;
private Notebook notebook;
+ private ScheduledExecutorService sessionCheckerExecutor;
- public SessionManager(Notebook notebook, InterpreterSettingManager interpreterSettingManager) {
+ public SessionManagerService(Notebook notebook, InterpreterSettingManager interpreterSettingManager) {
this.notebook = notebook;
this.interpreterSettingManager = interpreterSettingManager;
+ this.sessionCheckerExecutor = ExecutorFactory.singleton().createOrGetScheduled("Session-Checker-Executor", 1);
+ int sessionCheckerInterval = ZeppelinConfiguration.create()
+ .getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_SESSION_CHECK_INTERVAL);
+ this.sessionCheckerExecutor.scheduleAtFixedRate(() -> {
+ LOGGER.info("Start session check task");
+ Iterator<Map.Entry<String, SessionInfo>> iter = sessions.entrySet().iterator();
+ while(iter.hasNext()) {
+ Map.Entry<String, SessionInfo> entry = iter.next();
+ SessionInfo sessionInfo = null;
+ try {
+ sessionInfo = getSessionInfo(entry.getKey());
+ if (sessionInfo != null && sessionInfo.getState().equalsIgnoreCase("Stopped")) {
+ LOGGER.info("Session {} has been stopped, remove it and its associated note", entry.getKey());
+ try {
+ notebook.removeNote(sessionInfo.getNoteId(), AuthenticationInfo.ANONYMOUS);
+ } catch (IOException e) {
+ LOGGER.warn("Fail to remove session note: " + sessionInfo.getNoteId(), e);
+ }
+ iter.remove();
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Fail to check session for session: " + entry.getKey(), e);
+ }
+ }
+ }, sessionCheckerInterval, sessionCheckerInterval, TimeUnit.MILLISECONDS);
}
/**
@@ -72,7 +100,7 @@ public class SessionManager {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
- e.printStackTrace();
+ LOGGER.error("Interrupted", e);
}
} else {
break;
@@ -83,36 +111,49 @@ public class SessionManager {
throw new Exception("Unable to generate session id");
}
- Note sessionNote = notebook.createNote("/_ZSession/" + interpreter + "/" + sessionId, AuthenticationInfo.ANONYMOUS);
+ Note sessionNote = notebook.createNote(buildNotePath(interpreter, sessionId), AuthenticationInfo.ANONYMOUS);
SessionInfo sessionInfo = new SessionInfo(sessionId, sessionNote.getId(), interpreter);
sessions.put(sessionId, sessionInfo);
return sessionInfo;
}
+ private String buildNotePath(String interpreter, String sessionId) {
+ return "/_ZSession/" + interpreter + "/" + sessionId;
+ }
+
/**
* Remove and stop this session.
+ * 1. Stop associated interpreter process (InterpreterGroup)
+ * 2. Remove associated session note
*
* @param sessionId
*/
- public void removeSession(String sessionId) {
- this.sessions.remove(sessionId);
+ public void stopSession(String sessionId) throws Exception {
+ SessionInfo sessionInfo = this.sessions.remove(sessionId);
+ if (sessionInfo == null) {
+ throw new Exception("No such session: " + sessionId);
+ }
+ // stop the associated interpreter process
InterpreterGroup interpreterGroup = this.interpreterSettingManager.getInterpreterGroupById(sessionId);
if (interpreterGroup == null) {
- LOGGER.info("No interpreterGroup for session: " + sessionId);
+ LOGGER.info("No interpreterGroup for session: {}", sessionId);
return;
}
((ManagedInterpreterGroup) interpreterGroup).getInterpreterSetting().closeInterpreters(sessionId);
+
+ // remove associated session note
+ notebook.removeNote(sessionInfo.getNoteId(), AuthenticationInfo.ANONYMOUS);
}
/**
* Get the sessionInfo.
- * It method will also update its state if these's associated interpreter process.
+ * It method will also update its state if there's an associated interpreter process.
*
* @param sessionId
* @return
* @throws Exception
*/
- public SessionInfo getSession(String sessionId) throws Exception {
+ public SessionInfo getSessionInfo(String sessionId) throws Exception {
SessionInfo sessionInfo = sessions.get(sessionId);
if (sessionInfo == null) {
LOGGER.warn("No such session: " + sessionId);
@@ -123,16 +164,22 @@ public class SessionManager {
RemoteInterpreterProcess remoteInterpreterProcess =
((ManagedInterpreterGroup) interpreterGroup).getRemoteInterpreterProcess();
if (remoteInterpreterProcess == null) {
- sessionInfo.setState("Ready");
+ sessionInfo.setState(SessionState.READY.name());
} else if (remoteInterpreterProcess != null) {
sessionInfo.setStartTime(remoteInterpreterProcess.getStartTime());
sessionInfo.setWeburl(interpreterGroup.getWebUrl());
if (remoteInterpreterProcess.isRunning()) {
- sessionInfo.setState("Running");
+ sessionInfo.setState(SessionState.RUNNING.name());
} else {
- sessionInfo.setState("Stopped");
+ sessionInfo.setState(SessionState.STOPPED.name());
}
}
+ } else {
+ if (sessionInfo.getState().equals(SessionState.RUNNING.name())) {
+ // if it is running before, but interpreterGroup is null now, that means the session is stopped.
+ // e.g. InterpreterProcess is killed if it exceed idle timeout threshold.
+ sessionInfo.setState(SessionState.STOPPED.name());
+ }
}
return sessionInfo;
@@ -144,10 +191,15 @@ public class SessionManager {
* @return
* @throws Exception
*/
- public List<SessionInfo> getAllSessions() throws Exception {
+ public List<SessionInfo> listSessions() {
List<SessionInfo> sessionList = new ArrayList<>();
for (String sessionId : sessions.keySet()) {
- SessionInfo session = getSession(sessionId);
+ SessionInfo session = null;
+ try {
+ session = getSessionInfo(sessionId);
+ } catch (Exception e) {
+ LOGGER.warn("Fail to get sessionInfo for session: " + sessionId, e);
+ }
if (session != null) {
sessionList.add(session);
}
@@ -162,14 +214,25 @@ public class SessionManager {
* @return
* @throws Exception
*/
- public List<SessionInfo> getAllSessions(String interpreter) throws Exception {
+ public List<SessionInfo> listSessions(String interpreter) {
List<SessionInfo> sessionList = new ArrayList<>();
for (String sessionId : sessions.keySet()) {
- SessionInfo status = getSession(sessionId);
+ SessionInfo status = null;
+ try {
+ status = getSessionInfo(sessionId);
+ } catch (Exception e) {
+ LOGGER.warn("Fail to get sessionInfo for session: " + sessionId, e);
+ }
if (status != null && interpreter.equals(status.getInterpreter())) {
sessionList.add(status);
}
}
return sessionList;
}
+
+ enum SessionState {
+ READY,
+ RUNNING,
+ STOPPED
+ }
}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 3ccb10d..12faca3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -335,6 +335,11 @@ public class Notebook {
fireNoteRemoveEvent(note, subject);
}
+ public void removeNote(String noteId, AuthenticationInfo subject) throws IOException {
+ Note note = getNote(noteId);
+ removeNote(note, subject);
+ }
+
/**
* Get note from NotebookRepo and also initialize it with other properties that is not
* persistent in NotebookRepo, such as paragraphJobListener.