You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/04/22 04:44:07 UTC
zeppelin git commit: [ZEPPELIN-1828] Flaky Test:
RemoteInterpreterTest hanged in zeppelin-interpreter
Repository: zeppelin
Updated Branches:
refs/heads/master dad440dd1 -> 2f9011405
[ZEPPELIN-1828] Flaky Test: RemoteInterpreterTest hanged in zeppelin-interpreter
### What is this PR for?
When `interpreter.close()` occurs in code it shuts down _RemoteInterpreterEventPoller_ and then tries to close thrift connection. While closing thrift client will wait until events queue is empty. If _RemoteInterpreterEventPoller_ has not processed all thrift events before the shutdown signal comes, thrift client will wait forever. As a result Travis will terminate the build after code is not responding for 10 minutes.
### What type of PR is it?
[Bug Fix]
### Todos
We need to:
* [x] - write a test that checks that event queue is empty after _RemoteInterpreterEventPoller_ is done,
* [x] - clear unread events on _RemoteInterpreterEventPoller_ shutdown signal.
### What is the Jira issue?
[ZEPPELIN-1828](https://issues.apache.org/jira/browse/ZEPPELIN-1828)
### How should this be tested?
This issue can be reprodused if you put `Thread.sleep(3000L);` to _RemoteInterpreterEventPoller.java_
```
...
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE_ALL) {
Thread.sleep(3000L);
...
}
```
and run _RemoteInterpreterTest.java_.
### Questions:
* Does the licenses files need update? **no**
* Is there breaking changes for older versions? **no**
* Does this needs documentation? **no**
Author: Alexander Shoshin <Al...@epam.com>
Closes #2246 from AlexanderShoshin/ZEPPELIN-1828 and squashes the following commits:
5818a83 [Alexander Shoshin] clear unread events on shutdown
66fe9c7 [Alexander Shoshin] test that EventPoller clear unread events on shutdown
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/2f901140
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/2f901140
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/2f901140
Branch: refs/heads/master
Commit: 2f90114055e4f5af0f6d051d0ce1da1c6204bd97
Parents: dad440d
Author: Alexander Shoshin <Al...@epam.com>
Authored: Wed Apr 12 11:04:31 2017 +0300
Committer: Lee moon soo <mo...@apache.org>
Committed: Fri Apr 21 21:44:05 2017 -0700
----------------------------------------------------------------------
.../remote/RemoteInterpreterEventPoller.java | 9 ++++
.../RemoteInterpreterEventPollerTest.java | 55 ++++++++++++++++++++
2 files changed, 64 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2f901140/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index c841c71..126a46f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -266,11 +266,20 @@ public class RemoteInterpreterEventPoller extends Thread {
logger.error("Can't handle event " + event, e);
}
}
+ try {
+ clearUnreadEvents(interpreterProcess.getClient());
+ } catch (Exception e1) {
+ logger.error("Can't get RemoteInterpreterEvent", e1);
+ }
if (appendFuture != null) {
appendFuture.cancel(true);
}
}
+ private void clearUnreadEvents(Client client) throws TException {
+ while (client.getEvent().getType() != RemoteInterpreterEventType.NO_OP) {}
+ }
+
private void progressRemoteZeppelinControlEvent(
RemoteZeppelinServerResource.Type resourceType,
RemoteInterpreterProcessListener remoteWorksEventListener,
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2f901140/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
new file mode 100644
index 0000000..49aa7aa
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.junit.Test;
+
+import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RemoteInterpreterEventPollerTest {
+
+ @Test
+ public void shouldClearUnreadEventsOnShutdown() throws Exception {
+ RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess();
+ RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null);
+
+ eventPoller.setInterpreterProcess(interpreterProc);
+ eventPoller.shutdown();
+ eventPoller.start();
+ eventPoller.join();
+
+ assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType());
+ }
+
+ private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception {
+ RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent();
+ RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, "");
+ RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class);
+ RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class);
+
+ when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents);
+ when(intProc.getClient()).thenReturn(client);
+
+ return intProc;
+ }
+}