You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2014/11/14 08:30:37 UTC

tez git commit: Revert "TEZ-1737. Should add taskNum in VertexFinishedEvent (zjffdu)"

Repository: tez
Updated Branches:
  refs/heads/branch-0.5 202ce9767 -> 3197cf086


Revert "TEZ-1737. Should add taskNum in VertexFinishedEvent (zjffdu)"

This reverts commit 9f762dda3a81f984901de238ac830e775979095a.

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3197cf08
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3197cf08
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3197cf08

Branch: refs/heads/branch-0.5
Commit: 3197cf086030d80cc9a654d1734f9dd33c9305e6
Parents: 202ce97
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Nov 14 15:28:34 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Nov 14 15:28:34 2014 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  3 +-
 .../dag/history/events/VertexFinishedEvent.java | 10 +--
 tez-dag/src/main/proto/HistoryEvents.proto      |  1 -
 .../dag/app/dag/impl/TestVertexRecovery.java    | 89 +-------------------
 .../TestHistoryEventsProtoConversion.java       |  4 +-
 .../impl/TestHistoryEventJsonConversion.java    |  3 +-
 .../ats/TestHistoryEventTimelineConversion.java |  4 +-
 8 files changed, 10 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e4222fe..62833fc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,7 +12,6 @@ ALL CHANGES:
   TEZ-1749. Increase test timeout for TestLocalMode.testMultipleClientsWithSession
   TEZ-1750. Add a DAGScheduler which schedules tasks only when sources have been scheduled.
   TEZ-1761. TestRecoveryParser::testGetLastInProgressDAG fails in similar manner to TEZ-1686.
-  TEZ-1737. Should add taskNum in VertexFinishedEvent.
   TEZ-1770. Handle ConnectExceptions correctly when establishing connections to an NM which may be down.
   TEZ-1774. AppLaunched event for Timeline does not have start time set.
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 681fbaa..593ecca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1135,7 +1135,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         } else {
           vertexCompleteSeen = true;
         }
-        numTasks = finishedEvent.getNumTasks();
         recoveryCommitInProgress = false;
         recoveredState = finishedEvent.getState();
         diagnostics.add(finishedEvent.getDiagnostics());
@@ -1611,7 +1610,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     taskStats.put(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, failedTaskAttemptCount.get());
     taskStats.put(ATSConstants.NUM_KILLED_TASKS_ATTEMPTS, killedTaskAttemptCount.get());
 
-    VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, numTasks, initTimeRequested,
+    VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, initTimeRequested,
         initedTime, startTimeRequested, startedTime, finishTime, finalState, diagnostics,
         getAllCounters(), getVertexStats(), taskStats);
     this.appContext.getHistoryHandler().handleCriticalEvent(

http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index d1d7eef..d9cafc7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -43,7 +43,6 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
 
   private TezVertexID vertexID;
   private String vertexName;
-  private int numTasks;
   private long initRequestedTime;
   private long initedTime;
   private long startRequestedTime;
@@ -56,14 +55,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
   private VertexStats vertexStats;
   private Map<String, Integer> vertexTaskStats;
 
-  public VertexFinishedEvent(TezVertexID vertexId, String vertexName, int numTasks, long initRequestedTime,
+  public VertexFinishedEvent(TezVertexID vertexId, String vertexName, long initRequestedTime,
                              long initedTime, long startRequestedTime, long startedTime,
                              long finishTime, VertexState state, String diagnostics,
                              TezCounters counters, VertexStats vertexStats,
                              Map<String, Integer> vertexTaskStats) {
     this.vertexName = vertexName;
     this.vertexID = vertexId;
-    this.numTasks = numTasks;
     this.initRequestedTime = initRequestedTime;
     this.initedTime = initedTime;
     this.startRequestedTime = startRequestedTime;
@@ -184,17 +182,12 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
     return vertexTaskStats;
   }
 
-  public int getNumTasks() {
-    return numTasks;
-  }
-
   @Override
   public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
     VertexFinishStateProto finishStateProto =
         VertexFinishStateProto.newBuilder()
             .setState(state.ordinal())
             .setVertexId(vertexID.toString())
-            .setNumTasks(numTasks)
             .build();
 
     SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
@@ -212,7 +205,6 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
         VertexFinishStateProto.parseFrom(proto.getEventPayload());
     this.vertexID = TezVertexID.fromString(finishStateProto.getVertexId());
     this.state = VertexState.values()[finishStateProto.getState()];
-    this.numTasks = finishStateProto.getNumTasks();
     this.finishTime = proto.getTimestamp();
     this.fromSummary = true;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 5d18b0a..93f217f 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -198,5 +198,4 @@ message SummaryEventProto {
 message VertexFinishStateProto {
   optional string vertex_id = 1;
   optional int32 state = 2;
-  optional int32 num_tasks = 3;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 1b8ac2f..8fa574c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -47,7 +47,6 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
-import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
@@ -68,14 +67,11 @@ import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
-import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishStateProto;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -219,49 +215,6 @@ public class TestVertexRecovery {
     return dag;
   }
 
-  private DAGPlan createDAGPlanSingleVertex() {
-    DAGPlan dag =
-        DAGPlan
-            .newBuilder()
-            .setName("testverteximpl")
-            .addVertex(
-                VertexPlan
-                    .newBuilder()
-                    .setName("vertex1")
-                    .setType(PlanVertexType.NORMAL)
-                    .addTaskLocationHint(
-                        PlanTaskLocationHint.newBuilder().addHost("host1")
-                            .addRack("rack1").build())
-                    .setTaskConfig(
-                        PlanTaskConfiguration.newBuilder().setNumTasks(-1)
-                            .setVirtualCores(4).setMemoryMb(1024)
-                            .setJavaOpts("").setTaskModule("x1.y1").build())
-                    .addInputs(RootInputLeafOutputProto.newBuilder()
-                            .setIODescriptor(
-                                TezEntityDescriptorProto.newBuilder()
-                                    .setClassName("input").build())
-                            .setName("inputx")
-                            .setControllerDescriptor(
-                                TezEntityDescriptorProto
-                                    .newBuilder()
-                                    .setClassName("inputinitlizer"))
-                            .build())
-                    .addOutputs(
-                        DAGProtos.RootInputLeafOutputProto
-                            .newBuilder()
-                            .setIODescriptor(
-                                TezEntityDescriptorProto.newBuilder()
-                                    .setClassName("output").build())
-                            .setName("outputx")
-                            .setControllerDescriptor(
-                                TezEntityDescriptorProto
-                                    .newBuilder()
-                                    .setClassName(
-                                        CountingOutputCommitter.class.getName())))
-                    .build()).build();
-    return dag;
-  }
-
   class VertexEventHanlder implements EventHandler<VertexEvent> {
 
     private List<VertexEvent> events = new ArrayList<VertexEvent>();
@@ -363,44 +316,6 @@ public class TestVertexRecovery {
   }
 
   /**
-   * vertex1(New) -> StartRecoveryTransition(SUCCEEDED)
-   * @throws IOException 
-   */
-  @Test
-  public void testRecovery_Desired_SUCCEEDED_OnlySummaryLog() throws IOException {
-    DAGPlan dagPlan = createDAGPlanSingleVertex();
-    dag =
-        new DAGImpl(dagId, new Configuration(), dagPlan,
-            dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
-            new Credentials(), new SystemClock(), user,
-            mock(TaskHeartbeatHandler.class), mockAppContext);
-    when(mockAppContext.getCurrentDAG()).thenReturn(dag);
-    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
-
-    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
-    VertexFinishedEvent vertexFinishEvent = new VertexFinishedEvent();
-    vertexFinishEvent.fromSummaryProtoStream(SummaryEventProto.newBuilder()
-        .setDagId(dag.getID().toString())
-        .setEventType(HistoryEventType.VERTEX_FINISHED.ordinal())
-        .setTimestamp(100L)
-        .setEventPayload(VertexFinishStateProto.newBuilder()
-            .setNumTasks(2)
-            .setState(VertexState.SUCCEEDED.ordinal())
-            .setVertexId(vertex1.getVertexId().toString()).build().toByteString())
-        .build());
-    VertexState recoveredState = vertex1.restoreFromEvent(vertexFinishEvent);
-    // numTasks is recovered from summary log
-    assertEquals(2, vertex1.numTasks);
-    assertEquals(VertexState.SUCCEEDED, recoveredState);
-    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
-        VertexState.SUCCEEDED));
-    dispatcher.await();
-    assertEquals(VertexState.SUCCEEDED, vertex1.getState());
-    assertEquals(vertex1.numTasks, vertex1.succeededTaskCount);
-    assertEquals(vertex1.numTasks, vertex1.completedTaskCount);
-  }
-
-  /**
    * vertex1(New) -> StartRecoveryTransition(FAILED)
    */
   @Test
@@ -661,7 +576,7 @@ public class TestVertexRecovery {
     long finishTime = startTime + 100L;
     recoveredState =
         vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
-            "vertex1", 1, initRequestedTime, initedTime, startRequestedTime,
+            "vertex1", initRequestedTime, initedTime, startRequestedTime,
             startTime, finishTime, VertexState.SUCCEEDED, "",
             new TezCounters(), new VertexStats(), null));
     assertEquals(finishTime, vertex1.finishTime);
@@ -932,7 +847,7 @@ public class TestVertexRecovery {
     assertEquals(VertexState.RUNNING, recoveredState);
 
     recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
-        "vertex1", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
+        "vertex1", initRequestedTime, initedTime, initRequestedTime + 300L,
         initRequestedTime + 400L, initRequestedTime + 500L,
         VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), null));
     assertEquals(VertexState.SUCCEEDED, recoveredState);

http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 8913287..a7a23db 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -361,7 +361,7 @@ public class TestHistoryEventsProtoConversion {
       VertexFinishedEvent event =
           new VertexFinishedEvent(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
-              "vertex1", 1, 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
+              "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
               null, null, null, null);
       VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
           testProtoConversion(event);
@@ -376,7 +376,7 @@ public class TestHistoryEventsProtoConversion {
       VertexFinishedEvent event =
           new VertexFinishedEvent(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
-              "vertex1", 1, 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
+              "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
               "diagnose", new TezCounters(), new VertexStats(), null);
       VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
           testProtoConversion(event);

http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index a20c9fe..c3d51c3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -138,7 +139,7 @@ public class TestHistoryEventJsonConversion {
           event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 10);
           break;
         case VERTEX_FINISHED:
-          event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(),
+          event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
               random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
               null, null, null, null);
           break;

http://git-wip-us.apache.org/repos/asf/tez/blob/3197cf08/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index ce47820..1347d28 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -140,7 +140,7 @@ public class TestHistoryEventTimelineConversion {
           event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 1);
           break;
         case VERTEX_FINISHED:
-          event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(),
+          event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
               random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
               null, null, null, null);
           break;
@@ -452,7 +452,7 @@ public class TestHistoryEventTimelineConversion {
     taskStats.put("BAR", 200);
     VertexStats vertexStats = new VertexStats();
 
-    VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1,initRequestedTime,
+    VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", initRequestedTime,
         initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR,
         "diagnostics", null, vertexStats, taskStats);