You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/05/20 19:07:54 UTC
zeppelin git commit: [ZEPPELIN-2502] RemoteInterpreterServer hang
forever during shutdown
Repository: zeppelin
Updated Branches:
refs/heads/master 05af3bf4b -> c7c9aa1cc
[ZEPPELIN-2502] RemoteInterpreterServer hang forever during shutdown
### What is this PR for?
There is the chance to have a RemoteServerInterpreter hang forever during shutdown
### What type of PR is it?
[Bug Fix]
### What is the Jira issue?
[ZEPPELIN-2502]
### How should this be tested?
Unit test provided for the fix.
### Questions:
* Is there breaking changes for older versions?
* Does this needs documentation?
Author: andrea <an...@gmail.com>
Closes #2322 from andreaTP/processHang and squashes the following commits:
e58483e [andrea] [ZEPPELIN-2502] RemoteInterpreterServer hang forever during shutdown
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c7c9aa1c
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c7c9aa1c
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c7c9aa1c
Branch: refs/heads/master
Commit: c7c9aa1ccb14b8394cc01d91976f6ba61458f378
Parents: 05af3bf
Author: andrea <an...@gmail.com>
Authored: Thu May 4 13:52:22 2017 +0100
Committer: Lee moon soo <mo...@apache.org>
Committed: Sat May 20 15:07:47 2017 -0400
----------------------------------------------------------------------
.../remote/RemoteInterpreterEventClient.java | 7 ++-
.../remote/RemoteInterpreterServer.java | 7 ++-
.../remote/RemoteInterpreterServerTest.java | 63 ++++++++++++++++++++
3 files changed, 73 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7c9aa1c/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
index bb6de31..2cdbf39 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -477,15 +477,18 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
/**
* Wait for eventQueue becomes empty
*/
- public void waitForEventQueueBecomesEmpty() {
+ public void waitForEventQueueBecomesEmpty(long atMost) {
+ long startTime = System.currentTimeMillis();
synchronized (eventQueue) {
- while (!eventQueue.isEmpty()) {
+ while (!eventQueue.isEmpty() && (System.currentTimeMillis() - startTime) < atMost) {
try {
eventQueue.wait(100);
} catch (InterruptedException e) {
// ignore exception
}
}
+ if (!eventQueue.isEmpty())
+ eventQueue.clear();
}
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7c9aa1c/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 50881ca..719d2dd 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -80,6 +80,8 @@ public class RemoteInterpreterServer
private Map<String, Object> remoteWorksResponsePool;
private ZeppelinRemoteWorksController remoteWorksController;
+ private final long DEFAULT_SHUTDOWN_TIMEOUT = 2000;
+
public RemoteInterpreterServer(int port) throws TTransportException {
this.port = port;
@@ -99,7 +101,7 @@ public class RemoteInterpreterServer
@Override
public void shutdown() throws TException {
- eventClient.waitForEventQueueBecomesEmpty();
+ eventClient.waitForEventQueueBecomesEmpty(DEFAULT_SHUTDOWN_TIMEOUT);
if (interpreterGroup != null) {
interpreterGroup.close();
}
@@ -111,7 +113,8 @@ public class RemoteInterpreterServer
// this case, need to force kill the process
long startTime = System.currentTimeMillis();
- while (System.currentTimeMillis() - startTime < 2000 && server.isServing()) {
+ while (System.currentTimeMillis() - startTime < DEFAULT_SHUTDOWN_TIMEOUT &&
+ server.isServing()) {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7c9aa1c/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index af6b4bd..a4b3a25 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -20,6 +20,9 @@ package org.apache.zeppelin.interpreter.remote;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
@@ -72,5 +75,65 @@ public class RemoteInterpreterServerTest {
assertEquals(false, running);
}
+ class ShutdownRun implements Runnable {
+ private RemoteInterpreterServer serv = null;
+ public ShutdownRun(RemoteInterpreterServer serv) {
+ this.serv = serv;
+ }
+ @Override
+ public void run() {
+ try {
+ serv.shutdown();
+ } catch (Exception ex) {};
+ }
+ };
+
+ @Test
+ public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
+ RemoteInterpreterServer server = new RemoteInterpreterServer(
+ RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
+ assertEquals(false, server.isRunning());
+
+ server.start();
+ long startTime = System.currentTimeMillis();
+ boolean running = false;
+
+ while (System.currentTimeMillis() - startTime < 10 * 1000) {
+ if (server.isRunning()) {
+ running = true;
+ break;
+ } else {
+ Thread.sleep(200);
+ }
+ }
+
+ assertEquals(true, running);
+ assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", server.getPort()));
+
+ //just send an event on the client queue
+ server.eventClient.onAppStatusUpdate("","","","");
+
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
+ Runnable task = new ShutdownRun(server);
+
+ executor.schedule(task, 0, TimeUnit.MILLISECONDS);
+
+ while (System.currentTimeMillis() - startTime < 10 * 1000) {
+ if (server.isRunning()) {
+ Thread.sleep(200);
+ } else {
+ running = false;
+ break;
+ }
+ }
+
+ executor.shutdown();
+
+ //cleanup environment for next tests
+ server.shutdown();
+
+ assertEquals(false, running);
+ }
}