You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/05 20:57:55 UTC
tez git commit: TEZ-2369. Add a few unit tests for
RootInputInitializerManager. Backport a findbugs warning fix from master.
(sseth)
Repository: tez
Updated Branches:
refs/heads/branch-0.6 71fa843c0 -> 4b6537699
TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport
a findbugs warning fix from master. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4b653769
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4b653769
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4b653769
Branch: refs/heads/branch-0.6
Commit: 4b65376991ae7c64aab10e50848814d7cb848cd8
Parents: 71fa843
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 5 11:57:42 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 5 11:57:42 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../app/dag/RootInputInitializerManager.java | 3 +-
.../dag/TestRootInputInitializerManager.java | 201 +++++++++++++++++++
3 files changed, 204 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4b653769/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dc9a42e..c017808 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -188,6 +188,7 @@ TEZ-UI CHANGES (TEZ-8):
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-2369. Add a few unit tests for RootInputInitializerManager. Backport a findbugs warning fix from master.
TEZ-2379. org.apache.hadoop.yarn.state.InvalidStateTransitonException:
Invalid event: T_ATTEMPT_KILLED at KILLED.
TEZ-2397. Translation of LocalResources via Tez plan serialization can be lossy.
http://git-wip-us.apache.org/repos/asf/tez/blob/4b653769/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index bdd3689..84379e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -405,7 +405,7 @@ public class RootInputInitializerManager {
"AttemptId is -1. This is likely caused by TEZ-1577; recovery not supported when InputInitializerEvents are used");
}
Map<Integer, Integer> vertexSuccessfulAttemptMap = firstSuccessfulAttemptMap.get(vertexName);
- Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId);
+ Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId.getId());
if (successfulAttempt == null) {
successfulAttempt = attemptId;
vertexSuccessfulAttemptMap.put(taskId.getId(), successfulAttempt);
@@ -425,6 +425,7 @@ public class RootInputInitializerManager {
if (taskAttemptIndex == successfulAttempt) {
toForwardEvents.add((InputInitializerEvent) tezEvent.getEvent());
}
+ // Drop all other events which have the same source task Id.
eventIterator.remove();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4b653769/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
new file mode 100644
index 0000000..89eb2a6
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestRootInputInitializerManager {
+
+ // Simple testing. No events if task doesn't succeed.
+ // Also exercises path where two attempts are reported as successful via the stateChangeNotifier.
+ // Primarily a failure scenario, when a Task moves back to running from success
+ // Order event1, success1, event2, success2
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testEventBeforeSuccess() throws Exception {
+ InputDescriptor id = mock(InputDescriptor.class);
+ InputInitializerDescriptor iid = mock(InputInitializerDescriptor.class);
+ RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput =
+ new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>("InputName", id, iid);
+
+ InputInitializer initializer = mock(InputInitializer.class);
+ InputInitializerContext initializerContext = mock(InputInitializerContext.class);
+ Vertex vertex = mock(Vertex.class);
+ StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
+ AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+
+ RootInputInitializerManager.InitializerWrapper initializerWrapper =
+ new RootInputInitializerManager.InitializerWrapper(rootInput, initializer,
+ initializerContext, vertex, stateChangeNotifier, appContext);
+
+ ApplicationId appId = ApplicationId.newInstance(1000, 1);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID srcVertexId = TezVertexID.getInstance(dagId, 2);
+ TezTaskID srcTaskId1 = TezTaskID.getInstance(srcVertexId, 3);
+ Vertex srcVertex = mock(Vertex.class);
+ Task srcTask1 = mock(Task.class);
+ doReturn(TaskState.RUNNING).when(srcTask1).getState();
+ doReturn(srcTask1).when(srcVertex).getTask(srcTaskId1.getId());
+ when(appContext.getCurrentDAG().getVertex(any(String.class))).thenReturn(srcVertex);
+
+ String srcVertexName = "srcVertexName";
+ List<TezEvent> eventList = Lists.newLinkedList();
+
+
+ // First Attempt send event
+ TezTaskAttemptID srcTaskAttemptId11 = TezTaskAttemptID.getInstance(srcTaskId1, 1);
+ EventMetaData sourceInfo11 =
+ new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null,
+ srcTaskAttemptId11);
+ InputInitializerEvent e1 = InputInitializerEvent.create("fakeVertex", "fakeInput", null);
+ TezEvent te1 = new TezEvent(e1, sourceInfo11);
+ eventList.add(te1);
+ initializerWrapper.handleInputInitializerEvents(eventList);
+
+ verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+ eventList.clear();
+
+ // First attempt, Task success notification
+ initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId11.getId());
+ ArgumentCaptor<List> argumentCaptor = ArgumentCaptor.forClass(List.class);
+ verify(initializer, times(1)).handleInputInitializerEvent(argumentCaptor.capture());
+ List<InputInitializerEvent> invokedEvents = argumentCaptor.getValue();
+ assertEquals(1, invokedEvents.size());
+
+ reset(initializer);
+
+ // 2nd attempt send event
+ TezTaskAttemptID srcTaskAttemptId12 = TezTaskAttemptID.getInstance(srcTaskId1, 2);
+ EventMetaData sourceInfo12 =
+ new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null,
+ srcTaskAttemptId12);
+ InputInitializerEvent e2 = InputInitializerEvent.create("fakeVertex", "fakeInput", null);
+ TezEvent te2 = new TezEvent(e2, sourceInfo12);
+ eventList.add(te2);
+ initializerWrapper.handleInputInitializerEvents(eventList);
+
+ verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+ eventList.clear();
+ reset(initializer);
+
+ // 2nd attempt succeeded
+ initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId12.getId());
+ verify(initializer, never()).handleInputInitializerEvent(argumentCaptor.capture());
+ }
+
+ // Order event1 success1, success2, event2
+ // Primarily a failure scenario, when a Task moves back to running from success
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testSuccessBeforeEvent() throws Exception {
+ InputDescriptor id = mock(InputDescriptor.class);
+ InputInitializerDescriptor iid = mock(InputInitializerDescriptor.class);
+ RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput =
+ new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>("InputName", id, iid);
+
+ InputInitializer initializer = mock(InputInitializer.class);
+ InputInitializerContext initializerContext = mock(InputInitializerContext.class);
+ Vertex vertex = mock(Vertex.class);
+ StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
+ AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+
+ RootInputInitializerManager.InitializerWrapper initializerWrapper =
+ new RootInputInitializerManager.InitializerWrapper(rootInput, initializer,
+ initializerContext, vertex, stateChangeNotifier, appContext);
+
+ ApplicationId appId = ApplicationId.newInstance(1000, 1);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID srcVertexId = TezVertexID.getInstance(dagId, 2);
+ TezTaskID srcTaskId1 = TezTaskID.getInstance(srcVertexId, 3);
+ Vertex srcVertex = mock(Vertex.class);
+ Task srcTask1 = mock(Task.class);
+ doReturn(TaskState.RUNNING).when(srcTask1).getState();
+ doReturn(srcTask1).when(srcVertex).getTask(srcTaskId1.getId());
+ when(appContext.getCurrentDAG().getVertex(any(String.class))).thenReturn(srcVertex);
+
+ String srcVertexName = "srcVertexName";
+ List<TezEvent> eventList = Lists.newLinkedList();
+
+
+ // First Attempt send event
+ TezTaskAttemptID srcTaskAttemptId11 = TezTaskAttemptID.getInstance(srcTaskId1, 1);
+ EventMetaData sourceInfo11 =
+ new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null,
+ srcTaskAttemptId11);
+ InputInitializerEvent e1 = InputInitializerEvent.create("fakeVertex", "fakeInput", null);
+ TezEvent te1 = new TezEvent(e1, sourceInfo11);
+ eventList.add(te1);
+ initializerWrapper.handleInputInitializerEvents(eventList);
+
+ verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+ eventList.clear();
+
+ // First attempt, Task success notification
+ initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId11.getId());
+ ArgumentCaptor<List> argumentCaptor = ArgumentCaptor.forClass(List.class);
+ verify(initializer, times(1)).handleInputInitializerEvent(argumentCaptor.capture());
+ List<InputInitializerEvent> invokedEvents = argumentCaptor.getValue();
+ assertEquals(1, invokedEvents.size());
+
+ reset(initializer);
+
+
+ TezTaskAttemptID srcTaskAttemptId12 = TezTaskAttemptID.getInstance(srcTaskId1, 2);
+ // 2nd attempt succeeded
+ initializerWrapper.onTaskSucceeded(srcVertexName, srcTaskId1, srcTaskAttemptId12.getId());
+ verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+
+ // 2nd attempt send event
+ EventMetaData sourceInfo12 =
+ new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, srcVertexName, null,
+ srcTaskAttemptId12);
+ InputInitializerEvent e2 = InputInitializerEvent.create("fakeVertex", "fakeInput", null);
+ TezEvent te2 = new TezEvent(e2, sourceInfo12);
+ eventList.add(te2);
+ initializerWrapper.handleInputInitializerEvents(eventList);
+
+ verify(initializer, never()).handleInputInitializerEvent(any(List.class));
+ }
+}