You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/10/01 01:47:33 UTC
tez git commit: TEZ-2855. Fix a potential NPE while routing
VertexManager events. (sseth)
Repository: tez
Updated Branches:
refs/heads/master 427fa620c -> 97e1d867d
TEZ-2855. Fix a potential NPE while routing VertexManager events. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/97e1d867
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/97e1d867
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/97e1d867
Branch: refs/heads/master
Commit: 97e1d867d961c5ae277a00f55757b29bd7f82a87
Parents: 427fa62
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Sep 30 16:47:21 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Sep 30 16:47:21 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 24 ++-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 172 ++++++++++++++++++-
3 files changed, 195 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/97e1d867/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a844179..8e1c1b3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2855. Fix a potential NPE while routing VertexManager events.
TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
TEZ-2859. TestMergeManager.testLocalDiskMergeMultipleTasks failing
@@ -196,6 +197,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES
+ TEZ-2855. Fix a potential NPE while routing VertexManager events.
TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
TEZ-2858. Stop using System.currentTimeMillis in TestInputReadyTracker.
@@ -468,6 +470,7 @@ Release 0.6.3: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2855. Fix a potential NPE while routing VertexManager events.
TEZ-2716. DefaultSorter.isRleNeeded not thread safe
TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
http://git-wip-us.apache.org/repos/asf/tez/blob/97e1d867/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c9b4205..af55049 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -288,6 +288,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@VisibleForTesting
final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>();
+
+ @VisibleForTesting
+ final List<VertexManagerEvent> pendingVmEvents = new LinkedList<>();
LegacySpeculator speculator;
@@ -721,8 +724,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private final ProcessorDescriptor processorDescriptor;
private boolean vertexToBeReconfiguredByManager = false;
- AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
- AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
+ final AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
+ final AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
@VisibleForTesting
Map<Vertex, Edge> sourceVertices;
@@ -2543,6 +2546,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
try {
vertexManager.initialize();
vmIsInitialized.set(true);
+ if (!pendingVmEvents.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing: " + pendingVmEvents.size() + " pending VMEvents for Vertex: " +
+ logIdentifier);
+ }
+ for (VertexManagerEvent vmEvent : pendingVmEvents) {
+ vertexManager.onVertexManagerEventReceived(vmEvent);
+ }
+ pendingVmEvents.clear();
+ }
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier;
LOG.error(msg, e);
@@ -4506,7 +4519,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
getTaskAttemptIdentifier(dag.getName(), getName(), srcTaId));
}
if (target == this) {
- vertexManager.onVertexManagerEventReceived(vmEvent);
+ if (!vmIsInitialized.get()) {
+ // The VM hasn't been setup yet, defer event consumption
+ pendingVmEvents.add(vmEvent);
+ } else {
+ vertexManager.onVertexManagerEventReceived(vmEvent);
+ }
} else {
checkEventSourceMetadata(this, sourceMeta);
eventHandler.handle(new VertexEventRouteEvent(target
http://git-wip-us.apache.org/repos/asf/tez/blob/97e1d867/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 5f8b949..eb95bf9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl;
import java.nio.ByteBuffer;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
@@ -41,6 +42,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -432,7 +434,92 @@ public class TestVertexImpl {
.build();
return dag;
}
-
+
+ // Simple dag with a CountingVM on v3 (which has v1, v2 as inputs)
+ // v1, v2 -> v3
+ private DAGPlan createDAGPlanWithCountingVM() {
+ LOG.info("Setting up dag plan with coutning VertexManager");
+ DAGPlan dag = DAGPlan.newBuilder()
+ .setName("dagWithCountingVM")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
+ )
+ .addOutEdgeId("e1")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex2")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x2.y2")
+ .build()
+ )
+ .addOutEdgeId("e2")
+ .build()
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex3")
+ .setType(PlanVertexType.NORMAL)
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x3.y3")
+ .build()
+ )
+ .addInEdgeId("e1")
+ .addInEdgeId("e2")
+ .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
+ .setClassName(InvocationCountingVertexManager.class.getName()))
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+ .setInputVertexName("vertex1")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex3")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("e1")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .addEdge(
+ EdgePlan.newBuilder()
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3"))
+ .setInputVertexName("vertex2")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+ .setOutputVertexName("vertex3")
+ .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+ .setId("e2")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
+ .build();
+ return dag;
+ }
+
/**
* v1 -> v2
*/
@@ -5816,6 +5903,63 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
+ public void testVMEventBeforeVertexInitialized() throws Exception {
+ useCustomInitializer = true;
+ setupPreDagCreation();
+ dagPlan = createDAGPlanWithCountingVM();
+ setupPostDagCreation();
+
+ VertexImpl v1 = vertices.get("vertex1");
+ VertexImpl v2 = vertices.get("vertex2");
+ VertexImpl v3 = vertices.get("vertex3");
+ dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(),
+ VertexEventType.V_INIT));
+ dispatcher.await();
+ assertEquals(VertexState.INITED, v1.getState());
+ dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_START));
+ dispatcher.await();
+ assertEquals(VertexState.RUNNING, v1.getState());
+
+ assertEquals(VertexState.NEW, v3.getState());
+ // Generate a VM event for v1, targeted at v3
+ VertexManagerEvent vmEvent = VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new byte[0]));
+ TezEvent tezVmEvent = new TezEvent(vmEvent,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null,
+ TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(v1.getVertexId(), 1), 1)));
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent)));
+ dispatcher.await();
+
+ assertEquals(1, v3.pendingVmEvents.size());
+ assertEquals(0, InvocationCountingVertexManager.numVmEventsReceived.get());
+
+ // Initialize v2, which will trigger initialization of v3
+ dispatcher.getEventHandler().handle(new VertexEvent(v2.getVertexId(),
+ VertexEventType.V_INIT));
+ dispatcher.await();
+
+ assertEquals(VertexState.INITED, v3.getState());
+
+ // The VM event should have been processed.
+ assertEquals(0, v3.pendingVmEvents.size());
+ assertEquals(1, InvocationCountingVertexManager.numVmEventsReceived.get());
+
+ // Send another VM event - make sure it's processed without additional events.
+ vmEvent = VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new byte[0]));
+ tezVmEvent = new TezEvent(vmEvent,
+ new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null,
+ TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(v1.getVertexId(), 1), 2)));
+ dispatcher.getEventHandler()
+ .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent)));
+ dispatcher.await();
+
+ assertEquals(0, v3.pendingVmEvents.size());
+ assertEquals(2, InvocationCountingVertexManager.numVmEventsReceived.get());
+ }
+
+ @Test(timeout = 5000)
public void testExceptionFromVM_Initialize() throws TezException {
useCustomInitializer = true;
setupPreDagCreation();
@@ -6316,6 +6460,32 @@ public class TestVertexImpl {
}
}
+ public static class InvocationCountingVertexManager extends VertexManagerPlugin {
+
+ static final AtomicInteger numVmEventsReceived = new AtomicInteger(0);
+ static final AtomicInteger numInitializedInputs = new AtomicInteger(0);
+
+ public InvocationCountingVertexManager(VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+
+ }
+
+ @Override
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
+ numVmEventsReceived.incrementAndGet();
+ }
+
+ @Override
+ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
+ List<Event> events) throws Exception {
+ numInitializedInputs.incrementAndGet();
+ }
+ }
+
@InterfaceAudience.Private
public static class VertexManagerWithException extends RootInputVertexManager{