You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/06/03 05:53:36 UTC
tez git commit: TEZ-2391. TestVertexImpl timing out at times on
jenkins builds (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master a3950efd2 -> 7b2ec39f0
TEZ-2391. TestVertexImpl timing out at times on jenkins builds (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7b2ec39f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7b2ec39f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7b2ec39f
Branch: refs/heads/master
Commit: 7b2ec39f04519a3869981b4685826e23096d78e8
Parents: a3950ef
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Jun 3 11:53:29 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Jun 3 11:53:29 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dag/app/dag/TestStateChangeNotifier.java | 18 +++++++++-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 38 +++++++++++++++++---
3 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/7b2ec39f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c90e1cc..c7262b2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2391. TestVertexImpl timing out at times on jenkins builds.
TEZ-2509. YarnTaskSchedulerService should not try to allocate containers if AM is shutting down.
TEZ-2527. Tez UI: Application hangs on entering erroneous RegEx in counter table search box
TEZ-2523. Tez UI: derive applicationId from dag/vertex id instead of relying on json data
http://git-wip-us.apache.org/repos/asf/tez/blob/7b2ec39f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
index e6d1c31..d20903d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestStateChangeNotifier.java
@@ -31,6 +31,7 @@ import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+
import com.google.common.collect.Lists;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -42,15 +43,18 @@ import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestStateChangeNotifier {
// uses the thread based notification code path but effectively blocks update
// events till listeners have been notified
public static class StateChangeNotifierForTest extends StateChangeNotifier {
+ private static final Logger LOG = LoggerFactory.getLogger(StateChangeNotifierForTest.class);
AtomicInteger count = new AtomicInteger(0);
AtomicInteger totalCount = new AtomicInteger(0);
-
+
public StateChangeNotifierForTest(DAG dag) {
super(dag);
}
@@ -62,6 +66,18 @@ public class TestStateChangeNotifier {
@Override
protected void processedEventFromQueue() {
+ // addedEventToQueue runs in dispatcher thread while
+ // processedEventFromQueue runs in state change notifier event handling thread.
+ // It is not guaranteed that addedEventToQueue is invoked before processedEventFromQueue.
+ // so sleep here until there's available events
+ while(count.get() <=0) {
+ try {
+ Thread.sleep(10);
+ LOG.info("sleep to wait for available events");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
synchronized (count) {
if (count.decrementAndGet() == 0) {
count.notifyAll();
http://git-wip-us.apache.org/repos/asf/tez/blob/7b2ec39f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 0176b79..aeea407 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2143,6 +2143,10 @@ public class TestVertexImpl {
@SuppressWarnings({ "unchecked", "rawtypes" })
public void setupPostDagCreation() throws AMUserCodeException {
String dagName = "dag0";
+ // dispatcher may be created multiple times (setupPostDagCreation may be called multiples)
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
dispatcher = new DrainDispatcher();
appContext = mock(AppContext.class);
thh = mock(TaskHeartbeatHandler.class);
@@ -2187,6 +2191,10 @@ public class TestVertexImpl {
for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
}
+ // updateTracker may be created multiple times (setupPostDagCreation may be called multiples)
+ if (updateTracker != null) {
+ updateTracker.stop();
+ }
updateTracker = new StateChangeNotifierForTest(appContext.getCurrentDAG());
setupVertices();
when(dag.getVertex(any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() {
@@ -2256,11 +2264,11 @@ public class TestVertexImpl {
@After
public void teardown() {
- updateTracker.stop();
if (dispatcher.isInState(STATE.STARTED)) {
dispatcher.await();
dispatcher.stop();
}
+ updateTracker.stop();
execService.shutdownNow();
dispatcher = null;
vertexEventDispatcher = null;
@@ -3844,8 +3852,9 @@ public class TestVertexImpl {
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
- initializer.go();
while (v1.getState() == VertexState.INITIALIZING || v1.getState() == VertexState.INITED) {
+ // initializer thread may not have started, so call initializer.go() in the loop all the time
+ initializer.go();
Thread.sleep(10);
}
@@ -3864,6 +3873,7 @@ public class TestVertexImpl {
// which is required to track events that it receives.
EventHandlingRootInputInitializer initializer =
(EventHandlingRootInputInitializer) customInitializer;
+ initializer.setNumVertexStateUpdateEvents(3);
setupPreDagCreation();
dagPlan = createDAGPlanWithRunningInitializer();
setupPostDagCreation();
@@ -3882,8 +3892,9 @@ public class TestVertexImpl {
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
- // At this point, 3 events should have been received - since the dispatcher is complete.
- Assert.assertEquals(3, initializer.stateUpdates.size());
+ // wait for state update events are received, this is done in the state notifier thread
+ initializer.waitForVertexStateUpdate();
+
Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.CONFIGURED,
initializer.stateUpdates.get(0).getVertexState());
Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING,
@@ -5988,6 +5999,8 @@ public class TestVertexImpl {
private final Condition eventCondition = lock.newCondition();
private final List<VertexStateUpdate> stateUpdates = new LinkedList<VertexStateUpdate>();
+ private int numExpectedVertexStateUpdate = 1;
+ private Object waitForVertexStateUpdate = new Object();
private final List<InputInitializerEvent> initializerEvents = new LinkedList<InputInitializerEvent>();
private volatile InputInitializerContext context;
private volatile int numExpectedEvents = 1;
@@ -6054,11 +6067,28 @@ public class TestVertexImpl {
this.numExpectedEvents = numEvents;
}
+ public void setNumVertexStateUpdateEvents(int numEvents) {
+ this.numExpectedVertexStateUpdate = numEvents;
+ }
+
public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
if (exLocation == IIExceptionLocation.OnVertexStateUpdated) {
throw new RuntimeException(exLocation.name());
}
stateUpdates.add(stateUpdate);
+ if (stateUpdates.size() == numExpectedVertexStateUpdate) {
+ synchronized (waitForVertexStateUpdate) {
+ waitForVertexStateUpdate.notify();
+ }
+ }
+ }
+
+ public void waitForVertexStateUpdate() throws InterruptedException {
+ if (stateUpdates.size() < numExpectedVertexStateUpdate) {
+ synchronized (waitForVertexStateUpdate) {
+ waitForVertexStateUpdate.wait();
+ }
+ }
}
}