You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/05/20 19:07:54 UTC

zeppelin git commit: [ZEPPELIN-2502] RemoteInterpreterServer hang forever during shutdown

Repository: zeppelin
Updated Branches:
  refs/heads/master 05af3bf4b -> c7c9aa1cc


[ZEPPELIN-2502] RemoteInterpreterServer hang forever during shutdown

### What is this PR for?
There is the chance to have a RemoteServerInterpreter hang forever during shutdown

### What type of PR is it?
[Bug Fix]

### What is the Jira issue?
[ZEPPELIN-2502]

### How should this be tested?
Unit test provided for the fix.

### Questions:
* Is there breaking changes for older versions?
* Does this needs documentation?

Author: andrea <an...@gmail.com>

Closes #2322 from andreaTP/processHang and squashes the following commits:

e58483e [andrea] [ZEPPELIN-2502] RemoteInterpreterServer hang forever during shutdown


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c7c9aa1c
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c7c9aa1c
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c7c9aa1c

Branch: refs/heads/master
Commit: c7c9aa1ccb14b8394cc01d91976f6ba61458f378
Parents: 05af3bf
Author: andrea <an...@gmail.com>
Authored: Thu May 4 13:52:22 2017 +0100
Committer: Lee moon soo <mo...@apache.org>
Committed: Sat May 20 15:07:47 2017 -0400

----------------------------------------------------------------------
 .../remote/RemoteInterpreterEventClient.java    |  7 ++-
 .../remote/RemoteInterpreterServer.java         |  7 ++-
 .../remote/RemoteInterpreterServerTest.java     | 63 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7c9aa1c/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
index bb6de31..2cdbf39 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -477,15 +477,18 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
   /**
    * Wait for eventQueue becomes empty
    */
-  public void waitForEventQueueBecomesEmpty() {
+  public void waitForEventQueueBecomesEmpty(long atMost) {
+    long startTime = System.currentTimeMillis();
     synchronized (eventQueue) {
-      while (!eventQueue.isEmpty()) {
+      while (!eventQueue.isEmpty() && (System.currentTimeMillis() - startTime) < atMost) {
         try {
           eventQueue.wait(100);
         } catch (InterruptedException e) {
           // ignore exception
         }
       }
+      if (!eventQueue.isEmpty())
+        eventQueue.clear();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7c9aa1c/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 50881ca..719d2dd 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -80,6 +80,8 @@ public class RemoteInterpreterServer
   private Map<String, Object> remoteWorksResponsePool;
   private ZeppelinRemoteWorksController remoteWorksController;
 
+  private final long DEFAULT_SHUTDOWN_TIMEOUT = 2000;
+
   public RemoteInterpreterServer(int port) throws TTransportException {
     this.port = port;
 
@@ -99,7 +101,7 @@ public class RemoteInterpreterServer
 
   @Override
   public void shutdown() throws TException {
-    eventClient.waitForEventQueueBecomesEmpty();
+    eventClient.waitForEventQueueBecomesEmpty(DEFAULT_SHUTDOWN_TIMEOUT);
     if (interpreterGroup != null) {
       interpreterGroup.close();
     }
@@ -111,7 +113,8 @@ public class RemoteInterpreterServer
     // this case, need to force kill the process
 
     long startTime = System.currentTimeMillis();
-    while (System.currentTimeMillis() - startTime < 2000 && server.isServing()) {
+    while (System.currentTimeMillis() - startTime < DEFAULT_SHUTDOWN_TIMEOUT &&
+        server.isServing()) {
       try {
         Thread.sleep(300);
       } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7c9aa1c/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index af6b4bd..a4b3a25 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -20,6 +20,9 @@ package org.apache.zeppelin.interpreter.remote;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.thrift.TException;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
@@ -72,5 +75,65 @@ public class RemoteInterpreterServerTest {
     assertEquals(false, running);
   }
 
+  class ShutdownRun implements Runnable {
+    private RemoteInterpreterServer serv = null;
+    public ShutdownRun(RemoteInterpreterServer serv) {
+      this.serv = serv;
+    }
+    @Override
+    public void run() {
+      try {
+        serv.shutdown();
+      } catch (Exception ex) {};
+    }
+  };
+
+  @Test
+  public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
+    RemoteInterpreterServer server = new RemoteInterpreterServer(
+        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
+    assertEquals(false, server.isRunning());
+
+    server.start();
+    long startTime = System.currentTimeMillis();
+    boolean running = false;
+
+    while (System.currentTimeMillis() - startTime < 10 * 1000) {
+      if (server.isRunning()) {
+        running = true;
+        break;
+      } else {
+        Thread.sleep(200);
+      }
+    }
+
+    assertEquals(true, running);
+    assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", server.getPort()));
+
+    //just send an event on the client queue
+    server.eventClient.onAppStatusUpdate("","","","");
+
+    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
+    Runnable task = new ShutdownRun(server);
+
+    executor.schedule(task, 0, TimeUnit.MILLISECONDS);
+
+    while (System.currentTimeMillis() - startTime < 10 * 1000) {
+      if (server.isRunning()) {
+        Thread.sleep(200);
+      } else {
+        running = false;
+        break;
+      }
+    }
+
+    executor.shutdown();
+
+    //cleanup environment for next tests
+    server.shutdown();
+
+    assertEquals(false, running);
+  }
 
 }