You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/04/22 04:44:07 UTC

zeppelin git commit: [ZEPPELIN-1828] Flaky Test: RemoteInterpreterTest hanged in zeppelin-interpreter

Repository: zeppelin
Updated Branches:
  refs/heads/master dad440dd1 -> 2f9011405


[ZEPPELIN-1828] Flaky Test: RemoteInterpreterTest hanged in zeppelin-interpreter

### What is this PR for?
When `interpreter.close()` occurs in code it shuts down _RemoteInterpreterEventPoller_ and then tries to close thrift connection. While closing thrift client will wait until events queue is empty. If _RemoteInterpreterEventPoller_ has not processed all thrift events before the shutdown signal comes, thrift client will wait forever. As a result Travis will terminate the build after code is not responding for 10 minutes.

### What type of PR is it?
[Bug Fix]

### Todos
We need to:
* [x] - write a test that checks that event queue is empty after _RemoteInterpreterEventPoller_ is done,
* [x] - clear unread events on _RemoteInterpreterEventPoller_ shutdown signal.

### What is the Jira issue?
[ZEPPELIN-1828](https://issues.apache.org/jira/browse/ZEPPELIN-1828)

### How should this be tested?
This issue can be reprodused if you put `Thread.sleep(3000L);` to  _RemoteInterpreterEventPoller.java_
```
...
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE_ALL) {
    Thread.sleep(3000L);
    ...
}
```
and run _RemoteInterpreterTest.java_.

### Questions:
* Does the licenses files need update? **no**
* Is there breaking changes for older versions? **no**
* Does this needs documentation? **no**

Author: Alexander Shoshin <Al...@epam.com>

Closes #2246 from AlexanderShoshin/ZEPPELIN-1828 and squashes the following commits:

5818a83 [Alexander Shoshin] clear unread events on shutdown
66fe9c7 [Alexander Shoshin] test that EventPoller clear unread events on shutdown


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/2f901140
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/2f901140
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/2f901140

Branch: refs/heads/master
Commit: 2f90114055e4f5af0f6d051d0ce1da1c6204bd97
Parents: dad440d
Author: Alexander Shoshin <Al...@epam.com>
Authored: Wed Apr 12 11:04:31 2017 +0300
Committer: Lee moon soo <mo...@apache.org>
Committed: Fri Apr 21 21:44:05 2017 -0700

----------------------------------------------------------------------
 .../remote/RemoteInterpreterEventPoller.java    |  9 ++++
 .../RemoteInterpreterEventPollerTest.java       | 55 ++++++++++++++++++++
 2 files changed, 64 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2f901140/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index c841c71..126a46f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -266,11 +266,20 @@ public class RemoteInterpreterEventPoller extends Thread {
         logger.error("Can't handle event " + event, e);
       }
     }
+    try {
+      clearUnreadEvents(interpreterProcess.getClient());
+    } catch (Exception e1) {
+      logger.error("Can't get RemoteInterpreterEvent", e1);
+    }
     if (appendFuture != null) {
       appendFuture.cancel(true);
     }
   }
 
+  private void clearUnreadEvents(Client client) throws TException {
+    while (client.getEvent().getType() != RemoteInterpreterEventType.NO_OP) {}
+  }
+
   private void progressRemoteZeppelinControlEvent(
       RemoteZeppelinServerResource.Type resourceType,
       RemoteInterpreterProcessListener remoteWorksEventListener,

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2f901140/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
new file mode 100644
index 0000000..49aa7aa
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.junit.Test;
+
+import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RemoteInterpreterEventPollerTest {
+
+	@Test
+	public void shouldClearUnreadEventsOnShutdown() throws Exception {
+		RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess();
+		RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null);
+
+		eventPoller.setInterpreterProcess(interpreterProc);
+		eventPoller.shutdown();
+		eventPoller.start();
+		eventPoller.join();
+
+		assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType());
+	}
+
+	private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception {
+		RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent();
+		RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, "");
+		RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class);
+		RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class);
+
+		when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents);
+		when(intProc.getClient()).thenReturn(client);
+
+		return intProc;
+	}
+}