You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:35 UTC

[26/50] [abbrv] tez git commit: TEZ-1627. Remove OUTPUT_CONSUMABLE and related Event in TaskAttemptImpl (zjffdu)

TEZ-1627. Remove OUTPUT_CONSUMABLE and related Event in TaskAttemptImpl (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/31c1de6d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/31c1de6d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/31c1de6d

Branch: refs/heads/TEZ-8
Commit: 31c1de6daa7b6eff1a32a8d10999680084085fc1
Parents: 71ca74e
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Nov 7 10:20:37 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Nov 7 10:20:37 2014 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../java/org/apache/tez/dag/app/dag/Task.java   | 19 -----
 .../dag/app/dag/TaskAttemptStateInternal.java   |  1 -
 .../dag/app/dag/event/TaskAttemptEventType.java |  1 -
 .../tez/dag/app/dag/event/TaskEventType.java    |  1 -
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 72 -----------------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 84 --------------------
 7 files changed, 1 insertion(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2e58f0a..8fad485 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
 ALL CHANGES:
   TEZ-1733. TezMerger should sort FileChunks on size when merging
   TEZ-1738. Tez tfile parser for log parsing
+  TEZ-1627. Remove OUTPUT_CONSUMABLE and related Event in TaskAttemptImpl
 
 Release 0.5.3: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index b5fa45f..98a85cf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -55,25 +55,6 @@ public interface Task {
    * @return whether the attempt's output can be committed or not.
    */
   boolean canCommit(TezTaskAttemptID taskAttemptID);
-
-  
-  /**
-   * Do the running tasks need to stick around after they're done processing and
-   * generating output. Required for tasks which have custom output handling
-   * such as in-memory shuffle.
-   * 
-   * @return whether the task needs to stick around.
-   */
-  boolean needsWaitAfterOutputConsumable();
-  
-  /**
-   * Get the attempt id which has reported in as output ready. null if not
-   * applicable.
-   * 
-   * @return the attempt id which has reported in as output ready. null if not
-   * applicable.
-   */
-  TezTaskAttemptID getOutputConsumableAttempt();
   
   public Vertex getVertex();
   

http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
index a49c2a3..8d0d83e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
@@ -29,7 +29,6 @@ public enum TaskAttemptStateInternal {
   NEW,
   START_WAIT,
   RUNNING,
-  OUTPUT_CONSUMABLE, 
   KILL_IN_PROGRESS, 
   FAIL_IN_PROGRESS,
   KILLED,

http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index e7db8d1..c8eec1b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -30,7 +30,6 @@ public enum TaskAttemptEventType {
   TA_STARTED_REMOTELY,
   TA_STATUS_UPDATE,
   TA_DIAGNOSTICS_UPDATE,
-  TA_OUTPUT_CONSUMABLE,  // TODO History event to indicate this ?
   TA_COMMIT_PENDING,
   TA_DONE,
   TA_FAILED,

http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
index fe4dd5a..baec5f0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
@@ -34,7 +34,6 @@ public enum TaskEventType {
 
   //Producer:TaskAttempt
   T_ATTEMPT_LAUNCHED,
-  T_ATTEMPT_OUTPUT_CONSUMABLE,
   T_ATTEMPT_FAILED,
   T_ATTEMPT_SUCCEEDED,
   T_ATTEMPT_KILLED,

http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 45ca543..deaba42 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -236,10 +236,6 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptStateInternal.RUNNING,
           TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
           DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-      .addTransition(TaskAttemptStateInternal.RUNNING,
-          TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
-          new OutputConsumableTransition())
       // Optional, may not come in for all tasks.
       .addTransition(TaskAttemptStateInternal.RUNNING,
           TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE,
@@ -279,57 +275,6 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptEventType.TA_OUTPUT_FAILED,
           new OutputReportedFailedTransition())
 
-      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
-      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
-          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptEventType.TA_OUTPUT_CONSUMABLE)
-      // Stuck RPC. The client retries in a loop.
-      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE,
-          new SucceededTransition())
-      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
-          TaskAttemptEventType.TA_FAILED,
-          new TerminatedWhileRunningTransition(FAILED_HELPER))
-      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
-          TaskAttemptEventType.TA_TIMED_OUT,
-          new TerminatedWhileRunningTransition(FAILED_HELPER))
-      // TODO CREUSE Ensure TaskCompletionEvents are updated to reflect this.
-      // Something needs to go out to the job.
-      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptStateInternal.KILL_IN_PROGRESS,
-          TaskAttemptEventType.TA_KILL_REQUEST,
-          new TerminatedWhileRunningTransition(KILLED_HELPER))
-      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptStateInternal.KILL_IN_PROGRESS,
-          TaskAttemptEventType.TA_NODE_FAILED,
-          new TerminatedWhileRunningTransition(KILLED_HELPER))
-      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
-          TaskAttemptEventType.TA_CONTAINER_TERMINATING,
-          new TerminatedWhileRunningTransition(FAILED_HELPER))
-      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptStateInternal.FAILED,
-          TaskAttemptEventType.TA_CONTAINER_TERMINATED,
-          new ContainerCompletedBeforeRunningTransition())
-      .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          TaskAttemptStateInternal.KILLED,
-          TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
-          new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
-      .addTransition(
-          TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
-          EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
-              TaskAttemptStateInternal.OUTPUT_CONSUMABLE),
-          TaskAttemptEventType.TA_OUTPUT_FAILED,
-          new OutputReportedFailedTransition())
-
       .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS,
           TaskAttemptStateInternal.KILLED,
           TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -344,7 +289,6 @@ public class TaskAttemptImpl implements TaskAttempt,
           EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
-              TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
               TaskAttemptEventType.TA_COMMIT_PENDING,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
@@ -367,7 +311,6 @@ public class TaskAttemptImpl implements TaskAttempt,
           EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
-              TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
               TaskAttemptEventType.TA_COMMIT_PENDING,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
@@ -387,7 +330,6 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_SCHEDULE,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
-              TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
               TaskAttemptEventType.TA_COMMIT_PENDING,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
@@ -408,7 +350,6 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_SCHEDULE,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
-              TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
               TaskAttemptEventType.TA_COMMIT_PENDING,
               TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
               TaskAttemptEventType.TA_TIMED_OUT,
@@ -762,7 +703,6 @@ public class TaskAttemptImpl implements TaskAttempt,
     case START_WAIT:
       return TaskAttemptState.STARTING;
     case RUNNING:
-    case OUTPUT_CONSUMABLE:
       return TaskAttemptState.RUNNING;
     case FAILED:
     case FAIL_IN_PROGRESS:
@@ -1298,18 +1238,6 @@ public class TaskAttemptImpl implements TaskAttempt,
     }
   }
 
-  protected static class OutputConsumableTransition implements
-      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
-
-    @Override
-    public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
-      //TaskAttemptEventOutputConsumable orEvent = (TaskAttemptEventOutputConsumable) event;
-      //ta.shufflePort = orEvent.getOutputContext().getShufflePort();
-      ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
-          TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE));
-    }
-  }
-
   protected static class SucceededTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/31c1de6d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 7bafb3b..4ded9be 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -187,10 +187,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     // Transitions from RUNNING state
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
-    // This is an optional event.
-    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
-        TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
-        new AttemptProcessingCompleteTransition())
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
     .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
@@ -229,12 +225,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         EnumSet.of(
             TaskEventType.T_TERMINATE,
             TaskEventType.T_ATTEMPT_LAUNCHED,
-            TaskEventType.T_ATTEMPT_OUTPUT_CONSUMABLE,
             TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from SUCCEEDED state
-      // TODO May required different handling if OUTPUT_CONSUMABLE is one of
-      // the stages. i.e. Task would only SUCCEED after all output consumed.
     .addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
         EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED),
         TaskEventType.T_ATTEMPT_FAILED, new TaskRetroactiveFailureTransition())
@@ -298,9 +291,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
    */
 
-  private TezTaskAttemptID outputConsumableAttempt;
-  private boolean outputConsumableAttemptSuccessSent = false;
-
   //should be set to one which comes first
   //saying COMMIT_PENDING
   private TezTaskAttemptID commitAttempt;
@@ -750,30 +740,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
-  // TODO remove hacky name lookup
-  @Override
-  public boolean needsWaitAfterOutputConsumable() {
-    Vertex vertex = getVertex();
-    ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor();
-    if (processorDescriptor != null &&
-        processorDescriptor.getClassName().contains("InitialTaskWithInMemSort")) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-
-  @Override
-  public TezTaskAttemptID getOutputConsumableAttempt() {
-    readLock.lock();
-    try {
-      return this.outputConsumableAttempt;
-    } finally {
-      readLock.unlock();
-    }
-  }
-
   TaskAttemptImpl createAttempt(int attemptNumber) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
         taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
@@ -877,18 +843,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private void sendTaskAttemptCompletionEvent(TezTaskAttemptID attemptId,
       TaskAttemptStateInternal attemptState) {
-    // raise the completion event only if the container is assigned
-    // to nextAttemptNumber
-    if (needsWaitAfterOutputConsumable()) {
-      // An event may have been sent out during the OUTPUT_READY state itself.
-      // Make sure the same event is not being sent out again.
-      if (attemptId == outputConsumableAttempt
-          && attemptState == TaskAttemptStateInternal.SUCCEEDED) {
-        if (outputConsumableAttemptSuccessSent) {
-          return;
-        }
-      }
-    }
     eventHandler.handle(new VertexEventTaskAttemptCompleted(attemptId, attemptState));
   }
 
@@ -1037,38 +991,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
-  private static class AttemptProcessingCompleteTransition implements
-      SingleArcTransition<TaskImpl, TaskEvent> {
-
-    @Override
-    public void transition(TaskImpl task, TaskEvent event) {
-      TaskEventTAUpdate taEvent = (TaskEventTAUpdate) event;
-      TezTaskAttemptID attemptId = taEvent.getTaskAttemptID();
-
-      if (task.outputConsumableAttempt == null) {
-        task.sendTaskAttemptCompletionEvent(attemptId,
-            TaskAttemptStateInternal.SUCCEEDED);
-        task.outputConsumableAttempt = attemptId;
-        task.outputConsumableAttemptSuccessSent = true;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("TezTaskAttemptID: " + attemptId
-              + " set as the OUTPUT_READY attempt");
-        }
-      } else {
-        // Nothing to do. This task will eventually be told to die, or will be
-        // killed.
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("TezTaskAttemptID: "
-              + attemptId + " reporting OUTPUT_READY."
-              + " Will be asked to die since another attempt "
-              + task.outputConsumableAttempt + " already has output ready");
-        }
-        task.eventHandler.handle(new TaskAttemptEventKillRequest(attemptId,
-            "Alternate attemptId already serving output"));
-      }
-
-    }
-  }
 
   private static class AttemptSucceededTransition
       implements SingleArcTransition<TaskImpl, TaskEvent> {
@@ -1313,12 +1235,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
           castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
         task.commitAttempt = null;
       }
-      if (castEvent.getTaskAttemptID().equals(task.outputConsumableAttempt)) {
-        task.outputConsumableAttempt = null;
-        task.handleTaskAttemptCompletion(castEvent.getTaskAttemptID(),
-            TaskAttemptStateInternal.FAILED);
-      }
-
       // The attempt would have informed the scheduler about it's failure
 
       task.taskAttemptStatus.put(castEvent.getTaskAttemptID().getId(), true);