You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/02/17 11:32:03 UTC
zeppelin git commit: [ZEPPELIN-2065] Flaky test:
NotebookTest.testSchedulePoolUsage
Repository: zeppelin
Updated Branches:
refs/heads/master 05f293553 -> 82c765042
[ZEPPELIN-2065] Flaky test: NotebookTest.testSchedulePoolUsage
### What is this PR for?
`testSchedulePoolUsage()` test checks that jobs can be executed several times in a row using a cron expression.
It failed sometimes when it checked job completion by monitoring `job.dateFinished` and then tried to assert that `job.status` is `FINISHED`. It was not always true because `job.status` is changed after `job.dateFinished`.
### What type of PR is it?
Bug Fix
### Todos
* [ ] We need to catch job completion by checking `job.status` instead of `job.dateFinished`.
* [ ] It is better to catch job completion by adding job status change listener instead of using `Thread.sleep(..)` in a test method.
* [ ] We can make this test simpler by removing a check that a job will not run after a cron expressin is removed. We already have such check in a previous unit test named `testSchedule()`.
### What is the Jira issue?
[ZEPPELIN-2065](https://issues.apache.org/jira/browse/ZEPPELIN-2065)
### Questions:
* Does the licenses files need update? **no**
* Is there breaking changes for older versions? **no**
* Does this needs documentation? **no**
Author: Alexander Shoshin <Al...@epam.com>
This patch had conflicts when merged, resolved by
Committer: Lee moon soo <mo...@apache.org>
Closes #2014 from AlexanderShoshin/ZEPPELIN-2065 and squashes the following commits:
c2efa8d [Alexander Shoshin] replace AtomicInteger by CountDownLatch
8a0fe02 [Alexander Shoshin] use job status listener to count jobs launched by cron expression
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/82c76504
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/82c76504
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/82c76504
Branch: refs/heads/master
Commit: 82c765042181c56b330ab69650d717442f313288
Parents: 05f2935
Author: Alexander Shoshin <Al...@epam.com>
Authored: Mon Feb 13 17:13:46 2017 +0300
Committer: Lee moon soo <mo...@apache.org>
Committed: Fri Feb 17 20:31:52 2017 +0900
----------------------------------------------------------------------
.../apache/zeppelin/notebook/NotebookTest.java | 104 ++++++++-----------
1 file changed, 42 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/82c76504/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 78903a9..c3ba905 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -23,12 +23,12 @@ import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.dep.DependencyResolver;
@@ -69,6 +69,7 @@ public class NotebookTest implements JobListenerFactory{
private NotebookAuthorization notebookAuthorization;
private Credentials credentials;
private AuthenticationInfo anonymous = AuthenticationInfo.ANONYMOUS;
+ private StatusChangedListener afterStatusChangedListener;
@Before
public void setUp() throws Exception {
@@ -365,71 +366,43 @@ public class NotebookTest implements JobListenerFactory{
@Test
public void testSchedulePoolUsage() throws InterruptedException, IOException {
- // create a note and a paragraph
- Note note = notebook.createNote(anonymous);
- interpreterSettingManager.setInterpreters("user", note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
- Paragraph p = note.addParagraph(AuthenticationInfo.ANONYMOUS);
- Map config = Maps.newHashMap();
- p.setConfig(config);
- Date dateFinished = p.getDateFinished();
- String result = getResultString(p.getResult());
- assertEquals(result, StringUtils.EMPTY);
- assertNull(dateFinished);
-
- // set cron scheduler, once a second
- config = note.getConfig();
- config.put("enabled", true);
- config.put("cron", "* * * * * ?");
- note.setConfig(config);
- notebook.refreshCron(note.getId());
-
- // run job maxExecutionCount times
- int maxExecutionCount = 13;
- int maxRetryCount = 4 * maxExecutionCount;
- int executionCount = 0;
- int retryCount = 0;
- String resultTemplate = "%text repl1: p";
- p.setText("p" + executionCount);
-
- while (executionCount < maxExecutionCount) {
- if (p.getDateFinished() != null && !p.getDateFinished().equals(dateFinished)) {
- // paragraph has been executed
- assertNotEquals(dateFinished, p.getDateFinished());
- assertNotEquals(result, getResultString(p.getResult()));
- assertEquals(p.getResult().toString(), resultTemplate + executionCount);
- assertEquals(p.getStatus(), Status.FINISHED);
- executionCount++;
- dateFinished = p.getDateFinished();
- result = getResultString(p.getResult());
- p.setText("p" + executionCount);
- }
- Thread.sleep(1100);
- if (++retryCount > maxRetryCount) {
- logger.error("Couldn't schedule {} number of note executions after {} retries",
- maxExecutionCount, maxRetryCount);
- fail();
+ final int timeout = 30;
+ final String everySecondCron = "* * * * * ?";
+ final CountDownLatch jobsToExecuteCount = new CountDownLatch(13);
+ final Note note = notebook.createNote(anonymous);
+
+ executeNewParagraphByCron(note, everySecondCron);
+ afterStatusChangedListener = new StatusChangedListener() {
+ @Override
+ public void onStatusChanged(Job job, Status before, Status after) {
+ if (after == Status.FINISHED) {
+ jobsToExecuteCount.countDown();
+ }
}
- }
-
- // save results and update paragraph
- dateFinished = p.getDateFinished();
- result = getResultString(p.getResult());
- p.setText("new text");
- // remove cron scheduler
- config.put("cron", null);
+ };
+
+ assertTrue(jobsToExecuteCount.await(timeout, TimeUnit.SECONDS));
+
+ terminateScheduledNote(note);
+ afterStatusChangedListener = null;
+ }
+
+ private void executeNewParagraphByCron(Note note, String cron) {
+ Paragraph paragraph = note.addParagraph(AuthenticationInfo.ANONYMOUS);
+ paragraph.setText("p");
+ Map<String, Object> config = note.getConfig();
+ config.put("enabled", true);
+ config.put("cron", cron);
note.setConfig(config);
notebook.refreshCron(note.getId());
-
- Thread.sleep(1100);
-
- // ensure that hasn't been run again
- assertEquals(dateFinished, p.getDateFinished());
- assertEquals(result, getResultString(p.getResult()));
}
-
- private String getResultString(InterpreterResult result) {
- return result == null ? StringUtils.EMPTY : result.toString();
+
+ private void terminateScheduledNote(Note note) {
+ note.getConfig().remove("cron");
+ notebook.refreshCron(note.getId());
+ notebook.removeNote(note.getId(), anonymous);
}
+
@Test
public void testAutoRestartInterpreterAfterSchedule() throws InterruptedException, IOException{
@@ -1251,7 +1224,14 @@ public class NotebookTest implements JobListenerFactory{
@Override
public void afterStatusChange(Job job, Status before, Status after) {
+ if (afterStatusChangedListener != null) {
+ afterStatusChangedListener.onStatusChanged(job, before, after);
+ }
}
};
}
+
+ private interface StatusChangedListener {
+ void onStatusChanged(Job job, Status before, Status after);
+ }
}