You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/05 20:56:18 UTC

tez git commit: TEZ-2369. Add a few unit tests for RootInputInitializerManager. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master 0a6a7d3b6 -> 8c44f2484


TEZ-2369. Add a few unit tests for RootInputInitializerManager. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8c44f248
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8c44f248
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8c44f248

Branch: refs/heads/master
Commit: 8c44f2484b626ce9a7a3408bd3b0d7e0df2b1a24
Parents: 0a6a7d3
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 5 11:55:47 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 5 11:55:47 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../app/dag/RootInputInitializerManager.java    |   1 +
 .../dag/TestRootInputInitializerManager.java    | 201 +++++++++++++++++++
 3 files changed, 203 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8c44f248/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ba03aa3..d7a1e1f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -339,6 +339,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2369. Add a few unit tests for RootInputInitializerManager.
   TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:
     Invalid event: T_ATTEMPT_KILLED at KILLED.
   TEZ-2397. Translation of LocalResources via Tez plan serialization can be lossy.

http://git-wip-us.apache.org/repos/asf/tez/blob/8c44f248/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 7156e60..4ee00fa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -425,6 +425,7 @@ public class RootInputInitializerManager {
             if (taskAttemptIndex == successfulAttempt) {
               toForwardEvents.add((InputInitializerEvent) tezEvent.getEvent());
             }
+            // Drop all other events which have the same source task Id.
             eventIterator.remove();
           }
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/8c44f248/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
new file mode 100644
index 0000000..89eb2a6
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestRootInputInitializerManager {
+
+  // Simple testing. No events if task doesn't succeed.
+  // Also exercises path where two attempts are reported as successful via the stateChangeNotifier.
+  // Primarily a failure scenario, when a Task moves back to running from success
+  // Order event1, success1, event2, success2
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testEventBeforeSuccess() throws Exception {
+    InputDescriptor id = mock(InputDescriptor.class);
+    InputInitializerDescriptor iid = mock(InputInitializerDescriptor.class);
+    RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput =
+        new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>("InputName", id, iid);
+
+    InputInitializer initializer = mock(InputInitializer.class);
+    InputInitializerContext initializerContext = mock(InputInitializerContext.class);
+    Vertex vertex = mock(Vertex.class);
+    StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
+    AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+
+    RootInputInitializerManager.InitializerWrapper initializerWrapper =
+        new RootInputInitializerManager.InitializerWrapper(rootInput, initializer,
+            initializerContext, vertex, stateChangeNotifier, appContext);
+
+    ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID srcVertexId = TezVertexID.getInstance(dagId, 2);
+    TezTaskID srcTaskId1 = TezTaskID.getInstance(srcVertexId, 3);
+    Vertex srcVertex = mock(Vertex.class);
+    Task srcTask1 = mock(Task.class);
+    doReturn(TaskState.RUNNING).when(srcTask1).getState();
+    doReturn(srcTask1).when(srcVertex).getTask(srcTaskId1.getId());
+    when(appContext.getCurrentDAG().getVertex(any(String.class))).thenReturn(srcVertex);
+
+    String srcVertexName = "srcVertexName";
+    List<TezEvent> eventList = Lists.newLinkedList();
+
+
+    // First Attempt send event
+    TezTaskAttemptID srcTaskAttemptId11 = TezTaskAttemptID.getInstance(srcTaskId1, 1);
+    EventMetaData sourceInfo11 =
+        new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null,
+            srcTaskAttemptId11);
+    InputInitializerEvent e1 = InputInitializerEvent.create("fakeVertex", "fakeInput", null);
+    TezEvent te1 = new TezEvent(e1, sourceInfo11);
+    eventList.add(te1);
+    initializerWrapper.handleInputInitializerEvents(eventList);
+
+    verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+    eventList.clear();
+
+    // First attempt, Task success notification
+    initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId11.getId());
+    ArgumentCaptor<List> argumentCaptor = ArgumentCaptor.forClass(List.class);
+    verify(initializer, times(1)).handleInputInitializerEvent(argumentCaptor.capture());
+    List<InputInitializerEvent> invokedEvents = argumentCaptor.getValue();
+    assertEquals(1, invokedEvents.size());
+
+    reset(initializer);
+
+    // 2nd attempt send event
+    TezTaskAttemptID srcTaskAttemptId12 = TezTaskAttemptID.getInstance(srcTaskId1, 2);
+    EventMetaData sourceInfo12 =
+        new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null,
+            srcTaskAttemptId12);
+    InputInitializerEvent e2 = InputInitializerEvent.create("fakeVertex", "fakeInput", null);
+    TezEvent te2 = new TezEvent(e2, sourceInfo12);
+    eventList.add(te2);
+    initializerWrapper.handleInputInitializerEvents(eventList);
+
+    verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+    eventList.clear();
+    reset(initializer);
+
+    // 2nd attempt succeeded
+    initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId12.getId());
+    verify(initializer, never()).handleInputInitializerEvent(argumentCaptor.capture());
+  }
+
+  // Order event1 success1, success2, event2
+  // Primarily a failure scenario, when a Task moves back to running from success
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testSuccessBeforeEvent() throws Exception {
+    InputDescriptor id = mock(InputDescriptor.class);
+    InputInitializerDescriptor iid = mock(InputInitializerDescriptor.class);
+    RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput =
+        new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>("InputName", id, iid);
+
+    InputInitializer initializer = mock(InputInitializer.class);
+    InputInitializerContext initializerContext = mock(InputInitializerContext.class);
+    Vertex vertex = mock(Vertex.class);
+    StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
+    AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+
+    RootInputInitializerManager.InitializerWrapper initializerWrapper =
+        new RootInputInitializerManager.InitializerWrapper(rootInput, initializer,
+            initializerContext, vertex, stateChangeNotifier, appContext);
+
+    ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID srcVertexId = TezVertexID.getInstance(dagId, 2);
+    TezTaskID srcTaskId1 = TezTaskID.getInstance(srcVertexId, 3);
+    Vertex srcVertex = mock(Vertex.class);
+    Task srcTask1 = mock(Task.class);
+    doReturn(TaskState.RUNNING).when(srcTask1).getState();
+    doReturn(srcTask1).when(srcVertex).getTask(srcTaskId1.getId());
+    when(appContext.getCurrentDAG().getVertex(any(String.class))).thenReturn(srcVertex);
+
+    String srcVertexName = "srcVertexName";
+    List<TezEvent> eventList = Lists.newLinkedList();
+
+
+    // First Attempt send event
+    TezTaskAttemptID srcTaskAttemptId11 = TezTaskAttemptID.getInstance(srcTaskId1, 1);
+    EventMetaData sourceInfo11 =
+        new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null,
+            srcTaskAttemptId11);
+    InputInitializerEvent e1 = InputInitializerEvent.create("fakeVertex", "fakeInput", null);
+    TezEvent te1 = new TezEvent(e1, sourceInfo11);
+    eventList.add(te1);
+    initializerWrapper.handleInputInitializerEvents(eventList);
+
+    verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+    eventList.clear();
+
+    // First attempt, Task success notification
+    initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId11.getId());
+    ArgumentCaptor<List> argumentCaptor = ArgumentCaptor.forClass(List.class);
+    verify(initializer, times(1)).handleInputInitializerEvent(argumentCaptor.capture());
+    List<InputInitializerEvent> invokedEvents = argumentCaptor.getValue();
+    assertEquals(1, invokedEvents.size());
+
+    reset(initializer);
+
+
+    TezTaskAttemptID srcTaskAttemptId12 = TezTaskAttemptID.getInstance(srcTaskId1, 2);
+    // 2nd attempt succeeded
+    initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId12.getId());
+    verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+
+    // 2nd attempt send event
+    EventMetaData sourceInfo12 =
+        new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null,
+            srcTaskAttemptId12);
+    InputInitializerEvent e2 = InputInitializerEvent.create("fakeVertex", "fakeInput", null);
+    TezEvent te2 = new TezEvent(e2, sourceInfo12);
+    eventList.add(te2);
+    initializerWrapper.handleInputInitializerEvents(eventList);
+
+    verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+  }
+}