You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/11/25 15:02:25 UTC
[5/9] tez git commit: TEZ-2581. Umbrella for Tez Recovery Redesign
(zjffdu)
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index be67cb2..9a45859 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -19,8 +19,11 @@
package org.apache.tez.dag.app;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -29,11 +32,21 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData;
import org.apache.tez.dag.app.RecoveryParser.DAGSummaryData;
-import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData;
+import org.apache.tez.dag.app.RecoveryParser.TaskAttemptRecoveryData;
+import org.apache.tez.dag.app.RecoveryParser.TaskRecoveryData;
+import org.apache.tez.dag.app.RecoveryParser.VertexRecoveryData;
import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TestDAGImpl;
import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -42,10 +55,27 @@ import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.recovery.RecoveryService;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.*;
+import com.google.common.collect.Lists;
+
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -75,6 +105,7 @@ public class TestRecoveryParser {
mockDAGImpl = mock(DAGImpl.class);
when(mockAppMaster.createDAG(any(DAGPlan.class), any(TezDAGID.class))).thenReturn(mockDAGImpl);
parser = new RecoveryParser(mockAppMaster, localFS, recoveryPath, 3);
+ LogManager.getRootLogger().setLevel(Level.DEBUG);
}
private DAGSummaryData createDAGSummaryData(TezDAGID dagId, boolean completed) {
@@ -155,14 +186,14 @@ public class TestRecoveryParser {
new DAGStartedEvent(dagID, 1L, "user", "dag1")));
rService.stop();
- RecoveredDAGData dagData = parser.parseRecoveryData();
+ DAGRecoveryData dagData = parser.parseRecoveryData();
assertEquals(true, dagData.nonRecoverable);
assertTrue(dagData.reason.contains("DAG Commit was in progress, not recoverable,"));
// DAGSubmittedEvent is handled but DAGInitializedEvent and DAGStartedEvent in the next attempt are both skipped
// due to the dag is not recoerable.
verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class));
- verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGInitializedEvent.class));
- verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGStartedEvent.class));
+ assertNull(dagData.getDAGInitializedEvent());
+ assertNull(dagData.getDAGStartedEvent());
}
// skipAllOtherEvents due to dag finished
@@ -202,7 +233,7 @@ public class TestRecoveryParser {
new DAGStartedEvent(dagID, 1L, "user", "dag1")));
rService.stop();
- RecoveredDAGData dagData = parser.parseRecoveryData();
+ DAGRecoveryData dagData = parser.parseRecoveryData();
assertEquals(false, dagData.nonRecoverable);
assertEquals(DAGState.FAILED, dagData.dagState);
assertEquals(true, dagData.isCompleted);
@@ -210,9 +241,8 @@ public class TestRecoveryParser {
verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class));
// DAGInitializedEvent may not been handled before DAGFinishedEvent,
// because DAGFinishedEvent's writeToRecoveryImmediately is true
- verify(dagData.recoveredDAG).restoreFromEvent(isA(DAGFinishedEvent.class));
- // DAGStartedEvent is skipped due to it is after DAGFinishedEvent
- verify(dagData.recoveredDAG, never()).restoreFromEvent(isA(DAGStartedEvent.class));
+ assertNotNull(dagData.getDAGFinishedEvent());
+ assertNull(dagData.getDAGStartedEvent());
}
@Test(timeout = 5000)
@@ -250,13 +280,13 @@ public class TestRecoveryParser {
rService.stop();
// corrupted last records will be skipped but the whole recovery logs will be read
- RecoveredDAGData dagData = parser.parseRecoveryData();
+ DAGRecoveryData dagData = parser.parseRecoveryData();
assertEquals(false, dagData.isCompleted);
assertEquals(null, dagData.reason);
assertEquals(false, dagData.nonRecoverable);
// verify DAGSubmitedEvent & DAGInititlizedEvent is handled.
verify(mockAppMaster).createDAG(any(DAGPlan.class),any(TezDAGID.class));
- verify(dagData.recoveredDAG).restoreFromEvent(isA(DAGInitializedEvent.class));
+ assertNotNull(dagData.getDAGInitializedEvent());
}
@Test(timeout = 5000)
@@ -293,4 +323,434 @@ public class TestRecoveryParser {
}
}
+ @Test(timeout=5000)
+ public void testRecoverableSummary_DAGInCommitting() throws IOException {
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+ when(appContext.getClock()).thenReturn(new SystemClock());
+ when(mockDAGImpl.getID()).thenReturn(dagID);
+
+ RecoveryService rService = new RecoveryService(appContext);
+ Configuration conf = new Configuration();
+ conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+ rService.init(conf);
+ rService.start();
+
+ DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+ // write a DAGSubmittedEvent first to initialize summaryStream
+ rService.handle(new DAGHistoryEvent(dagID,
+ new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+ null, "user", new Configuration(), null)));
+ // It should be fine to skip other events, just for testing.
+ rService.handle(new DAGHistoryEvent(dagID,
+ new DAGCommitStartedEvent(dagID, 0L)));
+ rService.stop();
+
+ DAGRecoveryData dagData = parser.parseRecoveryData();
+ assertEquals(dagID, dagData.recoveredDagID);
+ assertTrue(dagData.nonRecoverable);
+ assertTrue(dagData.reason.contains("DAG Commit was in progress"));
+ }
+
+ @Test(timeout=5000)
+ public void testRecoverableSummary_DAGFinishCommitting() throws IOException {
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+ when(appContext.getClock()).thenReturn(new SystemClock());
+ when(mockDAGImpl.getID()).thenReturn(dagID);
+
+ RecoveryService rService = new RecoveryService(appContext);
+ Configuration conf = new Configuration();
+ conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+ rService.init(conf);
+ rService.start();
+
+ DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+ // write a DAGSubmittedEvent first to initialize summaryStream
+ rService.handle(new DAGHistoryEvent(dagID,
+ new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+ null, "user", new Configuration(), null)));
+ // It should be fine to skip other events, just for testing.
+ rService.handle(new DAGHistoryEvent(dagID,
+ new DAGCommitStartedEvent(dagID, 0L)));
+ rService.handle(new DAGHistoryEvent(dagID,
+ new DAGFinishedEvent(dagID, 1L, 2L, DAGState.FAILED, "diag", null, "user", "dag1", null,
+ appAttemptId)));
+ rService.stop();
+
+ DAGRecoveryData dagData = parser.parseRecoveryData();
+ assertEquals(dagID, dagData.recoveredDagID);
+ assertEquals(DAGState.FAILED, dagData.dagState);
+ assertFalse(dagData.nonRecoverable);
+ assertNull(dagData.reason);
+ assertTrue(dagData.isCompleted);
+ }
+
+ @Test(timeout=5000)
+ public void testRecoverableSummary_VertexInCommitting() throws IOException {
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+ when(appContext.getClock()).thenReturn(new SystemClock());
+ when(mockDAGImpl.getID()).thenReturn(dagID);
+
+ RecoveryService rService = new RecoveryService(appContext);
+ Configuration conf = new Configuration();
+ conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+ rService.init(conf);
+ rService.start();
+
+ DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+ // write a DAGSubmittedEvent first to initialize summaryStream
+ rService.handle(new DAGHistoryEvent(dagID,
+ new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+ null, "user", new Configuration(), null)));
+ // It should be fine to skip other events, just for testing.
+ rService.handle(new DAGHistoryEvent(dagID,
+ new VertexCommitStartedEvent(TezVertexID.getInstance(dagID, 0), 0L)));
+ rService.stop();
+
+ DAGRecoveryData dagData = parser.parseRecoveryData();
+ assertEquals(dagID, dagData.recoveredDagID);
+ assertTrue(dagData.nonRecoverable);
+ assertTrue(dagData.reason.contains("Vertex Commit was in progress"));
+ }
+
+ @Test(timeout=5000)
+ public void testRecoverableSummary_VertexFinishCommitting() throws IOException {
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+ when(appContext.getClock()).thenReturn(new SystemClock());
+ when(mockDAGImpl.getID()).thenReturn(dagID);
+
+ RecoveryService rService = new RecoveryService(appContext);
+ Configuration conf = new Configuration();
+ conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+ rService.init(conf);
+ rService.start();
+
+ DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+ // write a DAGSubmittedEvent first to initialize summaryStream
+ rService.handle(new DAGHistoryEvent(dagID,
+ new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+ null, "user", new Configuration(), null)));
+ // It should be fine to skip other events, just for testing.
+ TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
+ rService.handle(new DAGHistoryEvent(dagID,
+ new VertexCommitStartedEvent(vertexId, 0L)));
+ rService.handle(new DAGHistoryEvent(dagID,
+ new VertexFinishedEvent(vertexId, "v1", 10, 0L, 0L,
+ 0L, 0L, 0L, VertexState.SUCCEEDED,
+ "", null, null, null)));
+ rService.stop();
+
+ DAGRecoveryData dagData = parser.parseRecoveryData();
+ assertEquals(dagID, dagData.recoveredDagID);
+ assertFalse(dagData.nonRecoverable);
+ }
+
+ @Test(timeout=5000)
+ public void testRecoverableSummary_VertexGroupInCommitting() throws IOException {
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+ when(appContext.getClock()).thenReturn(new SystemClock());
+ when(mockDAGImpl.getID()).thenReturn(dagID);
+
+ RecoveryService rService = new RecoveryService(appContext);
+ Configuration conf = new Configuration();
+ conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+ rService.init(conf);
+ rService.start();
+
+ DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+ // write a DAGSubmittedEvent first to initialize summaryStream
+ rService.handle(new DAGHistoryEvent(dagID,
+ new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+ null, "user", new Configuration(), null)));
+ // It should be fine to skip other events, just for testing.
+ rService.handle(new DAGHistoryEvent(dagID,
+ new VertexGroupCommitStartedEvent(dagID, "group_1",
+ Lists.newArrayList(TezVertexID.getInstance(dagID, 0), TezVertexID.getInstance(dagID, 1)), 0L)));
+ rService.stop();
+
+ DAGRecoveryData dagData = parser.parseRecoveryData();
+ assertEquals(dagID, dagData.recoveredDagID);
+ assertTrue(dagData.nonRecoverable);
+ assertTrue(dagData.reason.contains("Vertex Group Commit was in progress"));
+ }
+
+ @Test(timeout=5000)
+ public void testRecoverableSummary_VertexGroupFinishCommitting() throws IOException {
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+ when(appContext.getClock()).thenReturn(new SystemClock());
+ when(mockDAGImpl.getID()).thenReturn(dagID);
+
+ RecoveryService rService = new RecoveryService(appContext);
+ Configuration conf = new Configuration();
+ conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+ rService.init(conf);
+ rService.start();
+
+ DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+ // write a DAGSubmittedEvent first to initialize summaryStream
+ rService.handle(new DAGHistoryEvent(dagID,
+ new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+ null, "user", new Configuration(), null)));
+ // It should be fine to skip other events, just for testing.
+ TezVertexID v0 = TezVertexID.getInstance(dagID, 0);
+ TezVertexID v1 = TezVertexID.getInstance(dagID, 1);
+ rService.handle(new DAGHistoryEvent(dagID,
+ new VertexGroupCommitStartedEvent(dagID, "group_1",
+ Lists.newArrayList(v0, v1), 0L)));
+ rService.handle(new DAGHistoryEvent(dagID,
+ new VertexGroupCommitFinishedEvent(dagID, "group_1",
+ Lists.newArrayList(v0, v1), 0L)));
+ // also write VertexFinishedEvent, otherwise it is still non-recoverable
+ // when checking with non-summary event
+ rService.handle(new DAGHistoryEvent(dagID,
+ new VertexFinishedEvent(v0, "v1", 10, 0L, 0L,
+ 0L, 0L, 0L, VertexState.SUCCEEDED,
+ "", null, null, null)));
+ rService.handle(new DAGHistoryEvent(dagID,
+ new VertexFinishedEvent(v1, "v1", 10, 0L, 0L,
+ 0L, 0L, 0L, VertexState.SUCCEEDED,
+ "", null, null, null)));
+ rService.stop();
+
+ DAGRecoveryData dagData = parser.parseRecoveryData();
+ assertEquals(dagID, dagData.recoveredDagID);
+ assertFalse(dagData.nonRecoverable);
+ }
+
+ @Test(timeout=5000)
+ public void testRecoverableNonSummary1() throws IOException {
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+ when(appContext.getClock()).thenReturn(new SystemClock());
+ when(mockDAGImpl.getID()).thenReturn(dagID);
+
+ // MockRecoveryService will skip the non-summary event
+ MockRecoveryService rService = new MockRecoveryService(appContext);
+ Configuration conf = new Configuration();
+ conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+ rService.init(conf);
+ rService.start();
+
+ DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+ // write a DAGSubmittedEvent first to initialize summaryStream
+ rService.handle(new DAGHistoryEvent(dagID,
+ new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+ null, "user", new Configuration(), null)));
+ // It should be fine to skip other events, just for testing.
+ TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
+ rService.handle(new DAGHistoryEvent(dagID,
+ new VertexCommitStartedEvent(vertexId, 0L)));
+ rService.handle(new DAGHistoryEvent(dagID,
+ new VertexFinishedEvent(vertexId, "v1", 10, 0L, 0L,
+ 0L, 0L, 0L, VertexState.SUCCEEDED,
+ "", null, null, null)));
+ rService.stop();
+
+ DAGRecoveryData dagData = parser.parseRecoveryData();
+ assertTrue(dagData.nonRecoverable);
+ assertTrue(dagData.reason.contains("Vertex has been committed, but its full recovery events are not seen"));
+ }
+
+ @Test(timeout=5000)
+ public void testRecoverableNonSummary2() throws IOException {
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+ when(appContext.getClock()).thenReturn(new SystemClock());
+ when(mockDAGImpl.getID()).thenReturn(dagID);
+
+ // MockRecoveryService will skip the non-summary event
+ MockRecoveryService rService = new MockRecoveryService(appContext);
+ Configuration conf = new Configuration();
+ conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+ rService.init(conf);
+ rService.start();
+
+ DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+ // write a DAGSubmittedEvent first to initialize summaryStream
+ rService.handle(new DAGHistoryEvent(dagID,
+ new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+ null, "user", new Configuration(), null)));
+ // It should be fine to skip other events, just for testing.
+ TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
+ rService.handle(new DAGHistoryEvent(dagID,
+ new VertexGroupCommitStartedEvent(dagID, "group_1",
+ Lists.newArrayList(TezVertexID.getInstance(dagID, 0), TezVertexID.getInstance(dagID, 1)), 0L)));
+ rService.handle(new DAGHistoryEvent(dagID,
+ new VertexGroupCommitFinishedEvent(dagID, "group_1",
+ Lists.newArrayList(TezVertexID.getInstance(dagID, 0), TezVertexID.getInstance(dagID, 1)), 0L)));
+ rService.stop();
+
+ DAGRecoveryData dagData = parser.parseRecoveryData();
+ assertTrue(dagData.nonRecoverable);
+ assertTrue(dagData.reason.contains("Vertex has been committed as member of vertex group"
+ + ", but its full recovery events are not seen"));
+ }
+
+ @Test(timeout=5000)
+ public void testRecoveryData() throws IOException {
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+ when(appContext.getClock()).thenReturn(new SystemClock());
+ when(mockDAGImpl.getID()).thenReturn(dagID);
+
+ RecoveryService rService = new RecoveryService(appContext);
+ Configuration conf = new Configuration();
+ conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+ rService.init(conf);
+ rService.start();
+
+ DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+ // DAG DAGSubmittedEvent -> DAGInitializedEvent -> DAGStartedEvent
+ rService.handle(new DAGHistoryEvent(dagID,
+ new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+ null, "user", new Configuration(), null)));
+ DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagID, 100L,
+ "user", "dagName", null);
+ DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagID, 0L, "user", "dagName");
+ rService.handle(new DAGHistoryEvent(dagID, dagInitedEvent));
+ rService.handle(new DAGHistoryEvent(dagID, dagStartedEvent));
+
+ // 3 vertices of this dag: v0, v1, v2
+ TezVertexID v0Id = TezVertexID.getInstance(dagID, 0);
+ TezVertexID v1Id = TezVertexID.getInstance(dagID, 1);
+ TezVertexID v2Id = TezVertexID.getInstance(dagID, 2);
+ // v0 VertexInitializedEvent
+ VertexInitializedEvent v0InitedEvent = new VertexInitializedEvent(v0Id, "v0", 200L, 400L, 2, null, null, null);
+ rService.handle(new DAGHistoryEvent(dagID, v0InitedEvent));
+ // v1 VertexFinishedEvent(KILLED)
+ VertexFinishedEvent v1FinishedEvent = new VertexFinishedEvent(v1Id, "v1", 2, 300L, 400L,
+ 500L, 600L, 700L, VertexState.KILLED,
+ "", null, null, null);
+ rService.handle(new DAGHistoryEvent(dagID, v1FinishedEvent));
+ // v2 VertexInitializedEvent -> VertexStartedEvent
+ List<TezEvent> initGeneratedEvents = Lists.newArrayList(
+ new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), null));
+ VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id, "v2", 200L, 300L,
+ 2, null, null, initGeneratedEvents);
+ VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L, 0L);
+ rService.handle(new DAGHistoryEvent(dagID, v2InitedEvent));
+ rService.handle(new DAGHistoryEvent(dagID, v2StartedEvent));
+
+ // 3 tasks of v2
+ TezTaskID t0v2Id = TezTaskID.getInstance(v2Id, 0);
+ TezTaskID t1v2Id = TezTaskID.getInstance(v2Id, 1);
+ TezTaskID t2v2Id = TezTaskID.getInstance(v2Id, 2);
+ // t0v2 TaskStartedEvent
+ TaskStartedEvent t0v2StartedEvent = new TaskStartedEvent(t0v2Id, "v2", 400L, 5000L);
+ rService.handle(new DAGHistoryEvent(dagID, t0v2StartedEvent));
+ // t1v2 TaskFinishedEvent
+ TaskFinishedEvent t1v2FinishedEvent = new TaskFinishedEvent(t1v2Id, "v1",
+ 0L, 0L, null, TaskState.KILLED, "", null, 4);
+ rService.handle(new DAGHistoryEvent(dagID, t1v2FinishedEvent));
+ // t2v2 TaskStartedEvent -> TaskFinishedEvent
+ TaskStartedEvent t2v2StartedEvent = new TaskStartedEvent(t2v2Id, "v2", 400L, 500L);
+ rService.handle(new DAGHistoryEvent(dagID, t2v2StartedEvent));
+ TaskFinishedEvent t2v2FinishedEvent = new TaskFinishedEvent(t2v2Id, "v1",
+ 0L, 0L, null, TaskState.SUCCEEDED, "", null, 4);
+ rService.handle(new DAGHistoryEvent(dagID, t2v2FinishedEvent));
+
+ // attempts under t0v2
+ ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
+ NodeId nodeId = NodeId.newInstance("localhost", 9999);
+ TezTaskAttemptID ta0t0v2Id = TezTaskAttemptID.getInstance(t0v2Id, 0);
+ TaskAttemptStartedEvent ta0t0v2StartedEvent = new TaskAttemptStartedEvent(
+ ta0t0v2Id, "v1", 0L, containerId,
+ nodeId, "", "", "");
+ rService.handle(new DAGHistoryEvent(dagID, ta0t0v2StartedEvent));
+ // attempts under t2v2
+ TezTaskAttemptID ta0t2v2Id = TezTaskAttemptID.getInstance(t2v2Id, 0);
+ TaskAttemptStartedEvent ta0t2v2StartedEvent = new TaskAttemptStartedEvent(
+ ta0t2v2Id, "v1", 500L, containerId,
+ nodeId, "", "", "");
+ rService.handle(new DAGHistoryEvent(dagID, ta0t2v2StartedEvent));
+ TaskAttemptFinishedEvent ta0t2v2FinishedEvent = new TaskAttemptFinishedEvent(
+ ta0t2v2Id, "v1", 500L, 600L,
+ TaskAttemptState.SUCCEEDED, null, "", null,
+ null, null, 0L, null, 0L);
+ rService.handle(new DAGHistoryEvent(dagID, ta0t2v2FinishedEvent));
+
+ rService.stop();
+
+ DAGRecoveryData dagData = parser.parseRecoveryData();
+ assertFalse(dagData.nonRecoverable);
+ // There's no equals method for the history event, so here only verify the init/start/finish time of each event for simplicity
+ assertEquals(dagInitedEvent.getInitTime(), dagData.getDAGInitializedEvent().getInitTime());
+ assertEquals(dagStartedEvent.getStartTime(), dagData.getDAGStartedEvent().getStartTime());
+ assertNull(dagData.getDAGFinishedEvent());
+
+ VertexRecoveryData v0Data = dagData.getVertexRecoveryData(v0Id);
+ VertexRecoveryData v1Data = dagData.getVertexRecoveryData(v1Id);
+ VertexRecoveryData v2Data = dagData.getVertexRecoveryData(v2Id);
+ assertNotNull(v0Data);
+ assertNotNull(v1Data);
+ assertNotNull(v2Data);
+ assertEquals(v0InitedEvent.getInitedTime(), v0Data.getVertexInitedEvent().getInitedTime());
+ assertNull(v0Data.getVertexStartedEvent());
+ assertNull(v1Data.getVertexInitedEvent());
+ assertEquals(v1FinishedEvent.getFinishTime(), v1Data.getVertexFinishedEvent().getFinishTime());
+ assertEquals(v2InitedEvent.getInitedTime(), v2Data.getVertexInitedEvent().getInitedTime());
+ assertEquals(v2StartedEvent.getStartTime(), v2Data.getVertexStartedEvent().getStartTime());
+
+ TaskRecoveryData t0v2Data = dagData.getTaskRecoveryData(t0v2Id);
+ TaskRecoveryData t1v2Data = dagData.getTaskRecoveryData(t1v2Id);
+ TaskRecoveryData t2v2Data = dagData.getTaskRecoveryData(t2v2Id);
+ assertNotNull(t0v2Data);
+ assertNotNull(t1v2Data);
+ assertNotNull(t2v2Data);
+ assertEquals(t0v2StartedEvent.getStartTime(), t0v2Data.getTaskStartedEvent().getStartTime());
+ assertNull(t0v2Data.getTaskFinishedEvent());
+ assertEquals(t1v2FinishedEvent.getFinishTime(), t1v2Data.getTaskFinishedEvent().getFinishTime());
+ assertNull(t1v2Data.getTaskStartedEvent());
+ assertEquals(t2v2StartedEvent.getStartTime(), t2v2Data.getTaskStartedEvent().getStartTime());
+ assertEquals(t2v2FinishedEvent.getFinishTime(), t2v2Data.getTaskFinishedEvent().getFinishTime());
+
+ TaskAttemptRecoveryData ta0t0v2Data = dagData.getTaskAttemptRecoveryData(ta0t0v2Id);
+ TaskAttemptRecoveryData ta0t2v2Data = dagData.getTaskAttemptRecoveryData(ta0t2v2Id);
+ assertNotNull(ta0t0v2Data);
+ assertNotNull(ta0t2v2Data);
+ assertEquals(ta0t0v2StartedEvent.getStartTime(), ta0t0v2Data.getTaskAttemptStartedEvent().getStartTime());
+ assertNull(ta0t0v2Data.getTaskAttemptFinishedEvent());
+ assertEquals(ta0t2v2StartedEvent.getStartTime(), ta0t2v2Data.getTaskAttemptStartedEvent().getStartTime());
+ assertEquals(ta0t2v2FinishedEvent.getFinishTime(), ta0t2v2Data.getTaskAttemptFinishedEvent().getFinishTime());
+ }
+
+ // Simulate the behavior that summary event is written
+ // but non-summary is not written to hdfs
+ public static class MockRecoveryService extends RecoveryService{
+
+ public MockRecoveryService(AppContext appContext) {
+ super(appContext);
+ }
+
+ @Override
+ protected void handleRecoveryEvent(DAGHistoryEvent event)
+ throws IOException {
+ // skip the non-summary events
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
index 17fa4d9..3f80928 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Random;
import com.google.common.collect.Lists;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
@@ -68,7 +69,9 @@ import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
@@ -83,6 +86,8 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
@@ -235,15 +240,18 @@ public class TestTaskCommunicatorManager1 {
@Test (timeout = 5000)
public void testTaskEventRouting() throws Exception {
List<TezEvent> events = Arrays.asList(
- new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), null),
- new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null),
- new TezEvent(new TaskAttemptCompletedEvent(), null)
+ new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), new EventMetaData(EventProducerConsumerType.PROCESSOR,
+ "v1", "v2", taskAttemptID)),
+ new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), new EventMetaData(EventProducerConsumerType.OUTPUT,
+ "v1", "v2", taskAttemptID)),
+ new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(EventProducerConsumerType.SYSTEM,
+ "v1", "v2", taskAttemptID))
);
generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
- verify(eventHandler, times(2)).handle(arg.capture());
+ verify(eventHandler, times(4)).handle(arg.capture());
final List<Event> argAllValues = arg.getAllValues();
final Event statusUpdateEvent = argAllValues.get(0);
@@ -251,28 +259,33 @@ public class TestTaskCommunicatorManager1 {
statusUpdateEvent.getType());
assertEquals(false, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported());
- final Event vertexEvent = argAllValues.get(1);
- final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
- assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
- vertexEvent.getType());
+ final TaskAttemptEventTezEventUpdate taEvent = (TaskAttemptEventTezEventUpdate)argAllValues.get(1);
+ assertEquals(1, taEvent.getTezEvents().size());
+ assertEquals(EventType.DATA_MOVEMENT_EVENT,
+ taEvent.getTezEvents().get(0).getEventType());
+
+ final TaskAttemptEvent taCompleteEvent = (TaskAttemptEvent)argAllValues.get(2);
+ assertEquals(TaskAttemptEventType.TA_DONE, taCompleteEvent.getType());
+ final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)argAllValues.get(3);
+ assertEquals(1, vertexRouteEvent.getEvents().size());
assertEquals(EventType.DATA_MOVEMENT_EVENT,
vertexRouteEvent.getEvents().get(0).getEventType());
- assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
- vertexRouteEvent.getEvents().get(1).getEventType());
}
@Test (timeout = 5000)
public void testTaskEventRoutingWithReadError() throws Exception {
List<TezEvent> events = Arrays.asList(
new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null, false), null),
- new TezEvent(InputReadErrorEvent.create("", 0, 0), null),
- new TezEvent(new TaskAttemptCompletedEvent(), null)
+ new TezEvent(InputReadErrorEvent.create("", 0, 0), new EventMetaData(EventProducerConsumerType.INPUT,
+ "v2", "v1", taskAttemptID)),
+ new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(EventProducerConsumerType.SYSTEM,
+ "v1", "v2", taskAttemptID))
);
generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
- verify(eventHandler, times(2)).handle(arg.capture());
+ verify(eventHandler, times(3)).handle(arg.capture());
final List<Event> argAllValues = arg.getAllValues();
final Event statusUpdateEvent = argAllValues.get(0);
@@ -280,22 +293,24 @@ public class TestTaskCommunicatorManager1 {
statusUpdateEvent.getType());
assertEquals(true, ((TaskAttemptEventStatusUpdate)statusUpdateEvent).getReadErrorReported());
- final Event vertexEvent = argAllValues.get(1);
+ final Event taFinishedEvent = argAllValues.get(1);
+ assertEquals("Second event should be TA_DONE", TaskAttemptEventType.TA_DONE,
+ taFinishedEvent.getType());
+
+ final Event vertexEvent = argAllValues.get(2);
final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
- assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
+ assertEquals("Third event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
vertexEvent.getType());
assertEquals(EventType.INPUT_READ_ERROR_EVENT,
vertexRouteEvent.getEvents().get(0).getEventType());
- assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
- vertexRouteEvent.getEvents().get(1).getEventType());
-
}
@Test (timeout = 5000)
public void testTaskEventRoutingTaskAttemptOnly() throws Exception {
List<TezEvent> events = Arrays.asList(
- new TezEvent(new TaskAttemptCompletedEvent(), null)
+ new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(EventProducerConsumerType.SYSTEM,
+ "v1", "v2", taskAttemptID))
);
generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
@@ -304,7 +319,8 @@ public class TestTaskCommunicatorManager1 {
final List<Event> argAllValues = arg.getAllValues();
final Event event = argAllValues.get(0);
- assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT,
+ // Route to TaskAttempt directly rather than through Vertex
+ assertEquals("only event should be route event", TaskAttemptEventType.TA_DONE,
event.getType());
}