You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/10/01 01:47:33 UTC

tez git commit: TEZ-2855. Fix a potential NPE while routing VertexManager events. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master 427fa620c -> 97e1d867d


TEZ-2855. Fix a potential NPE while routing VertexManager events. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/97e1d867
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/97e1d867
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/97e1d867

Branch: refs/heads/master
Commit: 97e1d867d961c5ae277a00f55757b29bd7f82a87
Parents: 427fa62
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Sep 30 16:47:21 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Sep 30 16:47:21 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  24 ++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 172 ++++++++++++++++++-
 3 files changed, 195 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/97e1d867/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a844179..8e1c1b3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2855. Fix a potential NPE while routing VertexManager events.
   TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
   TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
   TEZ-2859. TestMergeManager.testLocalDiskMergeMultipleTasks failing
@@ -196,6 +197,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES
+  TEZ-2855. Fix a potential NPE while routing VertexManager events.
   TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
   TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.
   TEZ-2858. Stop using System.currentTimeMillis in TestInputReadyTracker.
@@ -468,6 +470,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2855. Fix a potential NPE while routing VertexManager events.
   TEZ-2716. DefaultSorter.isRleNeeded not thread safe
   TEZ-2758. Remove append API in RecoveryService after TEZ-1909.
   TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez.

http://git-wip-us.apache.org/repos/asf/tez/blob/97e1d867/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c9b4205..af55049 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -288,6 +288,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   @VisibleForTesting
   final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>();
+
+  @VisibleForTesting
+  final List<VertexManagerEvent> pendingVmEvents = new LinkedList<>();
   
   LegacySpeculator speculator;
 
@@ -721,8 +724,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   private final ProcessorDescriptor processorDescriptor;
   
   private boolean vertexToBeReconfiguredByManager = false;
-  AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
-  AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
+  final AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
+  final AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
 
   @VisibleForTesting
   Map<Vertex, Edge> sourceVertices;
@@ -2543,6 +2546,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     try {
       vertexManager.initialize();
       vmIsInitialized.set(true);
+      if (!pendingVmEvents.isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Processing: " + pendingVmEvents.size() + " pending VMEvents for Vertex: " +
+              logIdentifier);
+        }
+        for (VertexManagerEvent vmEvent : pendingVmEvents) {
+          vertexManager.onVertexManagerEventReceived(vmEvent);
+        }
+        pendingVmEvents.clear();
+      }
     } catch (AMUserCodeException e) {
       String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier;
       LOG.error(msg, e);
@@ -4506,7 +4519,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
               getTaskAttemptIdentifier(dag.getName(), getName(), srcTaId));
         }
         if (target == this) {
-          vertexManager.onVertexManagerEventReceived(vmEvent);
+          if (!vmIsInitialized.get()) {
+            // The VM hasn't been setup yet, defer event consumption
+            pendingVmEvents.add(vmEvent);
+          } else {
+            vertexManager.onVertexManagerEventReceived(vmEvent);
+          }
         } else {
           checkEventSourceMetadata(this, sourceMeta);
           eventHandler.handle(new VertexEventRouteEvent(target

http://git-wip-us.apache.org/repos/asf/tez/blob/97e1d867/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 5f8b949..eb95bf9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl;
 
 import java.nio.ByteBuffer;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
@@ -41,6 +42,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -432,7 +434,92 @@ public class TestVertexImpl {
         .build();
     return dag;
   }
-  
+
+  // Simple dag with a CountingVM on v3 (which has v1, v2 as inputs)
+  // v1, v2 -> v3
+  private DAGPlan createDAGPlanWithCountingVM() {
+    LOG.info("Setting up dag plan with coutning VertexManager");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("dagWithCountingVM")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x1.y1")
+                        .build()
+                )
+                .addOutEdgeId("e1")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex2")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x2.y2")
+                        .build()
+                )
+                .addOutEdgeId("e2")
+                .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex3")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                        .setNumTasks(1)
+                        .setVirtualCores(4)
+                        .setMemoryMb(1024)
+                        .setJavaOpts("")
+                        .setTaskModule("x3.y3")
+                        .build()
+                )
+                .addInEdgeId("e1")
+                .addInEdgeId("e2")
+                .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
+                    .setClassName(InvocationCountingVertexManager.class.getName()))
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+                .setInputVertexName("vertex1")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("e1")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3"))
+                .setInputVertexName("vertex2")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("e2")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .build();
+    return dag;
+  }
+
   /**
    * v1 -> v2
    */
@@ -5816,6 +5903,63 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
+  public void testVMEventBeforeVertexInitialized() throws Exception {
+    useCustomInitializer = true;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithCountingVM();
+    setupPostDagCreation();
+
+    VertexImpl v1 = vertices.get("vertex1");
+    VertexImpl v2 = vertices.get("vertex2");
+    VertexImpl v3 = vertices.get("vertex3");
+    dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(),
+        VertexEventType.V_INIT));
+    dispatcher.await();
+    assertEquals(VertexState.INITED, v1.getState());
+    dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_START));
+    dispatcher.await();
+    assertEquals(VertexState.RUNNING, v1.getState());
+
+    assertEquals(VertexState.NEW, v3.getState());
+    // Generate a VM event for v1, targeted at v3
+    VertexManagerEvent vmEvent = VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new byte[0]));
+    TezEvent tezVmEvent = new TezEvent(vmEvent,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null,
+            TezTaskAttemptID.getInstance(
+                TezTaskID.getInstance(v1.getVertexId(), 1), 1)));
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent)));
+    dispatcher.await();
+
+    assertEquals(1, v3.pendingVmEvents.size());
+    assertEquals(0, InvocationCountingVertexManager.numVmEventsReceived.get());
+
+    // Initialize v2, which will trigger initialization of v3
+    dispatcher.getEventHandler().handle(new VertexEvent(v2.getVertexId(),
+        VertexEventType.V_INIT));
+    dispatcher.await();
+
+    assertEquals(VertexState.INITED, v3.getState());
+
+    // The VM event should have been processed.
+    assertEquals(0, v3.pendingVmEvents.size());
+    assertEquals(1, InvocationCountingVertexManager.numVmEventsReceived.get());
+
+    // Send another VM event - make sure it's processed without additional events.
+    vmEvent = VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new byte[0]));
+    tezVmEvent = new TezEvent(vmEvent,
+        new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null,
+            TezTaskAttemptID.getInstance(
+                TezTaskID.getInstance(v1.getVertexId(), 1), 2)));
+    dispatcher.getEventHandler()
+        .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent)));
+    dispatcher.await();
+
+    assertEquals(0, v3.pendingVmEvents.size());
+    assertEquals(2, InvocationCountingVertexManager.numVmEventsReceived.get());
+  }
+
+  @Test(timeout = 5000)
   public void testExceptionFromVM_Initialize() throws TezException {
     useCustomInitializer = true;
     setupPreDagCreation();
@@ -6316,6 +6460,32 @@ public class TestVertexImpl {
     }
   }
 
+  public static class InvocationCountingVertexManager extends VertexManagerPlugin {
+
+    static final AtomicInteger numVmEventsReceived = new AtomicInteger(0);
+    static final AtomicInteger numInitializedInputs = new AtomicInteger(0);
+
+    public InvocationCountingVertexManager(VertexManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() throws Exception {
+
+    }
+
+    @Override
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
+      numVmEventsReceived.incrementAndGet();
+    }
+
+    @Override
+    public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
+                                        List<Event> events) throws Exception {
+      numInitializedInputs.incrementAndGet();
+    }
+  }
+
   @InterfaceAudience.Private
   public static class VertexManagerWithException extends RootInputVertexManager{