You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/06/03 05:53:36 UTC

tez git commit: TEZ-2391. TestVertexImpl timing out at times on jenkins builds (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master a3950efd2 -> 7b2ec39f0


TEZ-2391. TestVertexImpl timing out at times on jenkins builds (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7b2ec39f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7b2ec39f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7b2ec39f

Branch: refs/heads/master
Commit: 7b2ec39f04519a3869981b4685826e23096d78e8
Parents: a3950ef
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Jun 3 11:53:29 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Jun 3 11:53:29 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../dag/app/dag/TestStateChangeNotifier.java    | 18 +++++++++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 38 +++++++++++++++++---
 3 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7b2ec39f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c90e1cc..c7262b2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2391. TestVertexImpl timing out at times on jenkins builds.
   TEZ-2509. YarnTaskSchedulerService should not try to allocate containers if AM is shutting down.
   TEZ-2527. Tez UI: Application hangs on entering erroneous RegEx in counter table search box
   TEZ-2523. Tez UI: derive applicationId from dag/vertex id instead of relying on json data

http://git-wip-us.apache.org/repos/asf/tez/blob/7b2ec39f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
index e6d1c31..d20903d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
@@ -31,6 +31,7 @@ import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import com.google.common.collect.Lists;
 
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -42,15 +43,18 @@ import org.apache.tez.dag.records.TezVertexID;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestStateChangeNotifier {
   
   // uses the thread based notification code path but effectively blocks update
   // events till listeners have been notified
   public static class StateChangeNotifierForTest extends StateChangeNotifier {
+    private static final Logger LOG = LoggerFactory.getLogger(StateChangeNotifierForTest.class);
     AtomicInteger count = new AtomicInteger(0);
     AtomicInteger totalCount = new AtomicInteger(0);
-    
+
     public StateChangeNotifierForTest(DAG dag) {
       super(dag);
     }
@@ -62,6 +66,18 @@ public class TestStateChangeNotifier {
     
     @Override
     protected void processedEventFromQueue() {
+      // addedEventToQueue runs in dispatcher thread while
+      // processedEventFromQueue runs in state change notifier event handling thread.
+      // It is not guaranteed that addedEventToQueue is invoked before processedEventFromQueue.
+      // so sleep here until there's available events
+      while(count.get() <=0) {
+        try {
+          Thread.sleep(10);
+          LOG.info("sleep to wait for available events");
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
       synchronized (count) {
         if (count.decrementAndGet() == 0) {
           count.notifyAll();

http://git-wip-us.apache.org/repos/asf/tez/blob/7b2ec39f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 0176b79..aeea407 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2143,6 +2143,10 @@ public class TestVertexImpl {
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public void setupPostDagCreation() throws AMUserCodeException {
     String dagName = "dag0";
+    // dispatcher may be created multiple times (setupPostDagCreation may be called multiples)
+    if (dispatcher != null) {
+      dispatcher.stop();
+    }
     dispatcher = new DrainDispatcher();
     appContext = mock(AppContext.class);
     thh = mock(TaskHeartbeatHandler.class);
@@ -2187,6 +2191,10 @@ public class TestVertexImpl {
     for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
       vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
     }
+    // updateTracker may be created multiple times (setupPostDagCreation may be called multiples)
+    if (updateTracker != null) {
+      updateTracker.stop();
+    }
     updateTracker = new StateChangeNotifierForTest(appContext.getCurrentDAG());
     setupVertices();
     when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() {
@@ -2256,11 +2264,11 @@ public class TestVertexImpl {
 
   @After
   public void teardown() {
-    updateTracker.stop();
     if (dispatcher.isInState(STATE.STARTED)) {
       dispatcher.await();
       dispatcher.stop();
     }
+    updateTracker.stop();
     execService.shutdownNow();
     dispatcher = null;
     vertexEventDispatcher = null;
@@ -3844,8 +3852,9 @@ public class TestVertexImpl {
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
 
-    initializer.go();
     while (v1.getState() == VertexState.INITIALIZING || v1.getState() == VertexState.INITED) {
+      // initializer thread may not have started, so call initializer.go() in the loop all the time
+      initializer.go();
       Thread.sleep(10);
     }
 
@@ -3864,6 +3873,7 @@ public class TestVertexImpl {
     // which is required to track events that it receives.
     EventHandlingRootInputInitializer initializer =
         (EventHandlingRootInputInitializer) customInitializer;
+    initializer.setNumVertexStateUpdateEvents(3);
     setupPreDagCreation();
     dagPlan = createDAGPlanWithRunningInitializer();
     setupPostDagCreation();
@@ -3882,8 +3892,9 @@ public class TestVertexImpl {
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
 
-    // At this point, 3 events should have been received - since the dispatcher is complete.
-    Assert.assertEquals(3, initializer.stateUpdates.size());
+    // wait for state update events are received, this is done in the state notifier thread
+    initializer.waitForVertexStateUpdate();
+
     Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.CONFIGURED,
         initializer.stateUpdates.get(0).getVertexState());
     Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
@@ -5988,6 +5999,8 @@ public class TestVertexImpl {
     private final Condition eventCondition = lock.newCondition();
 
     private final List<VertexStateUpdate> stateUpdates = new LinkedList<VertexStateUpdate>();
+    private int numExpectedVertexStateUpdate = 1;
+    private Object waitForVertexStateUpdate = new Object();
     private final List<InputInitializerEvent> initializerEvents = new LinkedList<InputInitializerEvent>();
     private volatile InputInitializerContext context;
     private volatile int numExpectedEvents = 1;
@@ -6054,11 +6067,28 @@ public class TestVertexImpl {
       this.numExpectedEvents = numEvents;
     }
 
+    public void setNumVertexStateUpdateEvents(int numEvents) {
+      this.numExpectedVertexStateUpdate = numEvents;
+    }
+
     public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
       if (exLocation == IIExceptionLocation.OnVertexStateUpdated) {
         throw new RuntimeException(exLocation.name());
       }
       stateUpdates.add(stateUpdate);
+      if (stateUpdates.size() == numExpectedVertexStateUpdate) {
+        synchronized (waitForVertexStateUpdate) {
+          waitForVertexStateUpdate.notify();
+        }
+      }
+    }
+
+    public void waitForVertexStateUpdate() throws InterruptedException {
+      if (stateUpdates.size() < numExpectedVertexStateUpdate) {
+        synchronized (waitForVertexStateUpdate) {
+          waitForVertexStateUpdate.wait();
+        }
+      }
     }
   }