You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/02/17 11:32:03 UTC

zeppelin git commit: [ZEPPELIN-2065] Flaky test: NotebookTest.testSchedulePoolUsage

Repository: zeppelin
Updated Branches:
  refs/heads/master 05f293553 -> 82c765042


[ZEPPELIN-2065] Flaky test: NotebookTest.testSchedulePoolUsage

### What is this PR for?

`testSchedulePoolUsage()` test checks that jobs can be executed several times in a row using a cron expression.

It failed sometimes when it checked job completion by monitoring `job.dateFinished` and then tried to assert that `job.status` is `FINISHED`. It was not always true because `job.status` is changed after `job.dateFinished`.

### What type of PR is it?
Bug Fix

### Todos
* [ ] We need to catch job completion by checking `job.status` instead of `job.dateFinished`.
* [ ] It is better to catch job completion by adding job status change listener instead of using `Thread.sleep(..)` in a test method.
* [ ] We can make this test simpler by removing a check that a job will not run after a cron expressin is removed. We already have such check in a previous unit test named `testSchedule()`.

### What is the Jira issue?
[ZEPPELIN-2065](https://issues.apache.org/jira/browse/ZEPPELIN-2065)

### Questions:
* Does the licenses files need update? **no**
* Is there breaking changes for older versions? **no**
* Does this needs documentation? **no**

Author: Alexander Shoshin <Al...@epam.com>

This patch had conflicts when merged, resolved by
Committer: Lee moon soo <mo...@apache.org>

Closes #2014 from AlexanderShoshin/ZEPPELIN-2065 and squashes the following commits:

c2efa8d [Alexander Shoshin] replace AtomicInteger by CountDownLatch
8a0fe02 [Alexander Shoshin] use job status listener to count jobs launched by cron expression


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/82c76504
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/82c76504
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/82c76504

Branch: refs/heads/master
Commit: 82c765042181c56b330ab69650d717442f313288
Parents: 05f2935
Author: Alexander Shoshin <Al...@epam.com>
Authored: Mon Feb 13 17:13:46 2017 +0300
Committer: Lee moon soo <mo...@apache.org>
Committed: Fri Feb 17 20:31:52 2017 +0900

----------------------------------------------------------------------
 .../apache/zeppelin/notebook/NotebookTest.java  | 104 ++++++++-----------
 1 file changed, 42 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/82c76504/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 78903a9..c3ba905 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -23,12 +23,12 @@ import static org.mockito.Mockito.mock;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.dep.DependencyResolver;
@@ -69,6 +69,7 @@ public class NotebookTest implements JobListenerFactory{
   private NotebookAuthorization notebookAuthorization;
   private Credentials credentials;
   private AuthenticationInfo anonymous = AuthenticationInfo.ANONYMOUS;
+  private StatusChangedListener afterStatusChangedListener;
 
   @Before
   public void setUp() throws Exception {
@@ -365,71 +366,43 @@ public class NotebookTest implements JobListenerFactory{
 
   @Test
   public void testSchedulePoolUsage() throws InterruptedException, IOException {
-    // create a note and a paragraph
-    Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreters("user", note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
-    Paragraph p = note.addParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config = Maps.newHashMap();
-    p.setConfig(config);
-    Date dateFinished = p.getDateFinished();
-    String result = getResultString(p.getResult());
-    assertEquals(result, StringUtils.EMPTY);
-    assertNull(dateFinished);
-    
-    // set cron scheduler, once a second
-    config = note.getConfig();
-    config.put("enabled", true);
-    config.put("cron", "* * * * * ?");
-    note.setConfig(config);
-    notebook.refreshCron(note.getId());
-    
-    // run job maxExecutionCount times
-    int maxExecutionCount = 13;
-    int maxRetryCount = 4 * maxExecutionCount;
-    int executionCount = 0;
-    int retryCount = 0;
-    String resultTemplate = "%text repl1: p";
-    p.setText("p" + executionCount);
-    
-    while (executionCount < maxExecutionCount) {
-      if (p.getDateFinished() != null && !p.getDateFinished().equals(dateFinished)) {
-        // paragraph has been executed
-        assertNotEquals(dateFinished, p.getDateFinished());
-        assertNotEquals(result, getResultString(p.getResult()));
-        assertEquals(p.getResult().toString(), resultTemplate + executionCount);
-        assertEquals(p.getStatus(), Status.FINISHED);
-        executionCount++;
-        dateFinished = p.getDateFinished();
-        result = getResultString(p.getResult());
-        p.setText("p" + executionCount);
-      }
-      Thread.sleep(1100);
-      if (++retryCount > maxRetryCount) {
-        logger.error("Couldn't schedule {} number of note executions after {} retries",
-            maxExecutionCount, maxRetryCount);
-        fail();
+    final int timeout = 30;
+    final String everySecondCron = "* * * * * ?";
+    final CountDownLatch jobsToExecuteCount = new CountDownLatch(13);
+    final Note note = notebook.createNote(anonymous);
+
+    executeNewParagraphByCron(note, everySecondCron);
+    afterStatusChangedListener = new StatusChangedListener() {
+      @Override
+      public void onStatusChanged(Job job, Status before, Status after) {
+        if (after == Status.FINISHED) {
+          jobsToExecuteCount.countDown();
+        }
       }
-    }
-    
-    // save results and update paragraph
-    dateFinished = p.getDateFinished();
-    result = getResultString(p.getResult());
-    p.setText("new text");
-    // remove cron scheduler
-    config.put("cron", null);
+    };
+
+    assertTrue(jobsToExecuteCount.await(timeout, TimeUnit.SECONDS));
+
+    terminateScheduledNote(note);
+    afterStatusChangedListener = null;
+  }
+
+  private void executeNewParagraphByCron(Note note, String cron) {
+    Paragraph paragraph = note.addParagraph(AuthenticationInfo.ANONYMOUS);
+    paragraph.setText("p");
+    Map<String, Object> config = note.getConfig();
+    config.put("enabled", true);
+    config.put("cron", cron);
     note.setConfig(config);
     notebook.refreshCron(note.getId());
-    
-    Thread.sleep(1100);
-    
-    // ensure that hasn't been run again
-    assertEquals(dateFinished, p.getDateFinished());
-    assertEquals(result, getResultString(p.getResult()));
   }
-  
-  private String getResultString(InterpreterResult result) {
-    return result == null ? StringUtils.EMPTY : result.toString();
+
+  private void terminateScheduledNote(Note note) {
+    note.getConfig().remove("cron");
+    notebook.refreshCron(note.getId());
+    notebook.removeNote(note.getId(), anonymous);
   }
+
   
   @Test
   public void testAutoRestartInterpreterAfterSchedule() throws InterruptedException, IOException{
@@ -1251,7 +1224,14 @@ public class NotebookTest implements JobListenerFactory{
 
       @Override
       public void afterStatusChange(Job job, Status before, Status after) {
+        if (afterStatusChangedListener != null) {
+          afterStatusChangedListener.onStatusChanged(job, before, after);
+        }
       }
     };
   }
+
+  private interface StatusChangedListener {
+    void onStatusChanged(Job job, Status before, Status after);
+  }
 }