You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2014/11/14 08:30:37 UTC
tez git commit: Revert "TEZ-1737. Should add taskNum in
VertexFinishedEvent (zjffdu)"
Repository: tez
Updated Branches:
refs/heads/branch-0.5 202ce9767 -> 3197cf086
Revert "TEZ-1737. Should add taskNum in VertexFinishedEvent (zjffdu)"
This reverts commit 9f762dda3a81f984901de238ac830e775979095a.
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3197cf08
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3197cf08
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3197cf08
Branch: refs/heads/branch-0.5
Commit: 3197cf086030d80cc9a654d1734f9dd33c9305e6
Parents: 202ce97
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Nov 14 15:28:34 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Nov 14 15:28:34 2014 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 3 +-
.../dag/history/events/VertexFinishedEvent.java | 10 +--
tez-dag/src/main/proto/HistoryEvents.proto | 1 -
.../dag/app/dag/impl/TestVertexRecovery.java | 89 +-------------------
.../TestHistoryEventsProtoConversion.java | 4 +-
.../impl/TestHistoryEventJsonConversion.java | 3 +-
.../ats/TestHistoryEventTimelineConversion.java | 4 +-
8 files changed, 10 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e4222fe..62833fc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,7 +12,6 @@ ALL CHANGES:
TEZ-1749. Increase test timeout for TestLocalMode.testMultipleClientsWithSession
TEZ-1750. Add a DAGScheduler which schedules tasks only when sources have been scheduled.
TEZ-1761. TestRecoveryParser::testGetLastInProgressDAG fails in similar manner to TEZ-1686.
- TEZ-1737. Should add taskNum in VertexFinishedEvent.
TEZ-1770. Handle ConnectExceptions correctly when establishing connections to an NM which may be down.
TEZ-1774. AppLaunched event for Timeline does not have start time set.
http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 681fbaa..593ecca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1135,7 +1135,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
} else {
vertexCompleteSeen = true;
}
- numTasks = finishedEvent.getNumTasks();
recoveryCommitInProgress = false;
recoveredState = finishedEvent.getState();
diagnostics.add(finishedEvent.getDiagnostics());
@@ -1611,7 +1610,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
taskStats.put(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, failedTaskAttemptCount.get());
taskStats.put(ATSConstants.NUM_KILLED_TASKS_ATTEMPTS, killedTaskAttemptCount.get());
- VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, numTasks, initTimeRequested,
+ VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, initTimeRequested,
initedTime, startTimeRequested, startedTime, finishTime, finalState, diagnostics,
getAllCounters(), getVertexStats(), taskStats);
this.appContext.getHistoryHandler().handleCriticalEvent(
http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index d1d7eef..d9cafc7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -43,7 +43,6 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
private TezVertexID vertexID;
private String vertexName;
- private int numTasks;
private long initRequestedTime;
private long initedTime;
private long startRequestedTime;
@@ -56,14 +55,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
private VertexStats vertexStats;
private Map<String, Integer> vertexTaskStats;
- public VertexFinishedEvent(TezVertexID vertexId, String vertexName, int numTasks, long initRequestedTime,
+ public VertexFinishedEvent(TezVertexID vertexId, String vertexName, long initRequestedTime,
long initedTime, long startRequestedTime, long startedTime,
long finishTime, VertexState state, String diagnostics,
TezCounters counters, VertexStats vertexStats,
Map<String, Integer> vertexTaskStats) {
this.vertexName = vertexName;
this.vertexID = vertexId;
- this.numTasks = numTasks;
this.initRequestedTime = initRequestedTime;
this.initedTime = initedTime;
this.startRequestedTime = startRequestedTime;
@@ -184,17 +182,12 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
return vertexTaskStats;
}
- public int getNumTasks() {
- return numTasks;
- }
-
@Override
public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
VertexFinishStateProto finishStateProto =
VertexFinishStateProto.newBuilder()
.setState(state.ordinal())
.setVertexId(vertexID.toString())
- .setNumTasks(numTasks)
.build();
SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
@@ -212,7 +205,6 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
VertexFinishStateProto.parseFrom(proto.getEventPayload());
this.vertexID = TezVertexID.fromString(finishStateProto.getVertexId());
this.state = VertexState.values()[finishStateProto.getState()];
- this.numTasks = finishStateProto.getNumTasks();
this.finishTime = proto.getTimestamp();
this.fromSummary = true;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 5d18b0a..93f217f 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -198,5 +198,4 @@ message SummaryEventProto {
message VertexFinishStateProto {
optional string vertex_id = 1;
optional int32 state = 2;
- optional int32 num_tasks = 3;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 1b8ac2f..8fa574c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -47,7 +47,6 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
-import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
@@ -68,14 +67,11 @@ import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
-import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishStateProto;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -219,49 +215,6 @@ public class TestVertexRecovery {
return dag;
}
- private DAGPlan createDAGPlanSingleVertex() {
- DAGPlan dag =
- DAGPlan
- .newBuilder()
- .setName("testverteximpl")
- .addVertex(
- VertexPlan
- .newBuilder()
- .setName("vertex1")
- .setType(PlanVertexType.NORMAL)
- .addTaskLocationHint(
- PlanTaskLocationHint.newBuilder().addHost("host1")
- .addRack("rack1").build())
- .setTaskConfig(
- PlanTaskConfiguration.newBuilder().setNumTasks(-1)
- .setVirtualCores(4).setMemoryMb(1024)
- .setJavaOpts("").setTaskModule("x1.y1").build())
- .addInputs(RootInputLeafOutputProto.newBuilder()
- .setIODescriptor(
- TezEntityDescriptorProto.newBuilder()
- .setClassName("input").build())
- .setName("inputx")
- .setControllerDescriptor(
- TezEntityDescriptorProto
- .newBuilder()
- .setClassName("inputinitlizer"))
- .build())
- .addOutputs(
- DAGProtos.RootInputLeafOutputProto
- .newBuilder()
- .setIODescriptor(
- TezEntityDescriptorProto.newBuilder()
- .setClassName("output").build())
- .setName("outputx")
- .setControllerDescriptor(
- TezEntityDescriptorProto
- .newBuilder()
- .setClassName(
- CountingOutputCommitter.class.getName())))
- .build()).build();
- return dag;
- }
-
class VertexEventHanlder implements EventHandler<VertexEvent> {
private List<VertexEvent> events = new ArrayList<VertexEvent>();
@@ -363,44 +316,6 @@ public class TestVertexRecovery {
}
/**
- * vertex1(New) -> StartRecoveryTransition(SUCCEEDED)
- * @throws IOException
- */
- @Test
- public void testRecovery_Desired_SUCCEEDED_OnlySummaryLog() throws IOException {
- DAGPlan dagPlan = createDAGPlanSingleVertex();
- dag =
- new DAGImpl(dagId, new Configuration(), dagPlan,
- dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
- new Credentials(), new SystemClock(), user,
- mock(TaskHeartbeatHandler.class), mockAppContext);
- when(mockAppContext.getCurrentDAG()).thenReturn(dag);
- dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
-
- VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
- VertexFinishedEvent vertexFinishEvent = new VertexFinishedEvent();
- vertexFinishEvent.fromSummaryProtoStream(SummaryEventProto.newBuilder()
- .setDagId(dag.getID().toString())
- .setEventType(HistoryEventType.VERTEX_FINISHED.ordinal())
- .setTimestamp(100L)
- .setEventPayload(VertexFinishStateProto.newBuilder()
- .setNumTasks(2)
- .setState(VertexState.SUCCEEDED.ordinal())
- .setVertexId(vertex1.getVertexId().toString()).build().toByteString())
- .build());
- VertexState recoveredState = vertex1.restoreFromEvent(vertexFinishEvent);
- // numTasks is recovered from summary log
- assertEquals(2, vertex1.numTasks);
- assertEquals(VertexState.SUCCEEDED, recoveredState);
- vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
- VertexState.SUCCEEDED));
- dispatcher.await();
- assertEquals(VertexState.SUCCEEDED, vertex1.getState());
- assertEquals(vertex1.numTasks, vertex1.succeededTaskCount);
- assertEquals(vertex1.numTasks, vertex1.completedTaskCount);
- }
-
- /**
* vertex1(New) -> StartRecoveryTransition(FAILED)
*/
@Test
@@ -661,7 +576,7 @@ public class TestVertexRecovery {
long finishTime = startTime + 100L;
recoveredState =
vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
- "vertex1", 1, initRequestedTime, initedTime, startRequestedTime,
+ "vertex1", initRequestedTime, initedTime, startRequestedTime,
startTime, finishTime, VertexState.SUCCEEDED, "",
new TezCounters(), new VertexStats(), null));
assertEquals(finishTime, vertex1.finishTime);
@@ -932,7 +847,7 @@ public class TestVertexRecovery {
assertEquals(VertexState.RUNNING, recoveredState);
recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
- "vertex1", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
+ "vertex1", initRequestedTime, initedTime, initRequestedTime + 300L,
initRequestedTime + 400L, initRequestedTime + 500L,
VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), null));
assertEquals(VertexState.SUCCEEDED, recoveredState);
http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 8913287..a7a23db 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -361,7 +361,7 @@ public class TestHistoryEventsProtoConversion {
VertexFinishedEvent event =
new VertexFinishedEvent(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
- "vertex1", 1, 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
+ "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
null, null, null, null);
VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
testProtoConversion(event);
@@ -376,7 +376,7 @@ public class TestHistoryEventsProtoConversion {
VertexFinishedEvent event =
new VertexFinishedEvent(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
- "vertex1", 1, 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
+ "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
"diagnose", new TezCounters(), new VertexStats(), null);
VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
testProtoConversion(event);
http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index a20c9fe..c3d51c3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -138,7 +139,7 @@ public class TestHistoryEventJsonConversion {
event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 10);
break;
case VERTEX_FINISHED:
- event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(),
+ event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
null, null, null, null);
break;
http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index ce47820..1347d28 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -140,7 +140,7 @@ public class TestHistoryEventTimelineConversion {
event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 1);
break;
case VERTEX_FINISHED:
- event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(),
+ event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
null, null, null, null);
break;
@@ -452,7 +452,7 @@ public class TestHistoryEventTimelineConversion {
taskStats.put("BAR", 200);
VertexStats vertexStats = new VertexStats();
- VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1,initRequestedTime,
+ VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", initRequestedTime,
initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR,
"diagnostics", null, vertexStats, taskStats);