You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/01/08 00:51:12 UTC

tez git commit: TEZ-485. Get rid of TezTaskStatus. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master f4be19375 -> 4fe8eae91


TEZ-485. Get rid of TezTaskStatus. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4fe8eae9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4fe8eae9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4fe8eae9

Branch: refs/heads/master
Commit: 4fe8eae91cde0d0743681a0d38bce5faad64008f
Parents: f4be193
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Jan 7 15:50:46 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Jan 7 15:50:46 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  91 ------
 .../org/apache/tez/common/TezTaskStatus.java    | 105 -------
 .../tez/mapreduce/hadoop/MRTaskStatus.java      | 300 -------------------
 .../tez/mapreduce/hadoop/TezTypeConverters.java |   7 -
 .../apache/tez/mapreduce/processor/MRTask.java  |  15 -
 6 files changed, 1 insertion(+), 518 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4fe8eae9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b6b15ce..83c73fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-485. Get rid of TezTaskStatus.
   TEZ-1899. Fix findbugs warnings in tez-common module.
   TEZ-1898. Fix findbugs warnings in tez-api module.
   TEZ-1906. Fix findbugs warnings in tez-yarn-timeline-history-with-acls.

http://git-wip-us.apache.org/repos/asf/tez/blob/4fe8eae9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index cf34e0e..53da741 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -247,97 +247,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     return task;
   }
 
-  /*
-  @Override
-  public boolean statusUpdate(TezTaskAttemptID taskAttemptId,
-      TezTaskStatus taskStatus) throws IOException, InterruptedException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Status update from: " + taskAttemptId);
-    }
-    taskHeartbeatHandler.progressing(taskAttemptId);
-    pingContainerHeartbeatHandler(taskAttemptId);
-    TaskAttemptStatusOld taskAttemptStatus = new TaskAttemptStatusOld();
-    taskAttemptStatus.id = taskAttemptId;
-    // Task sends the updated progress to the TT.
-    taskAttemptStatus.progress = taskStatus.getProgress();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Progress of TaskAttempt " + taskAttemptId + " is : "
-          + taskStatus.getProgress());
-    }
-
-    // Task sends the updated state-string to the TT.
-    taskAttemptStatus.stateString = taskStatus.getStateString();
-
-    // Set the output-size when map-task finishes. Set by the task itself.
-    // outputSize is never used.
-    taskAttemptStatus.outputSize = taskStatus.getLocalOutputSize();
-
-    // TODO Phase
-    // Task sends the updated phase to the TT.
-    //taskAttemptStatus.phase = MRxTypeConverters.toYarn(taskStatus.getPhase());
-
-    // TODO MRXAM3 - AVoid the 10 layers of convresion.
-    // Counters are updated by the task. Convert counters into new format as
-    // that is the primary storage format inside the AM to avoid multiple
-    // conversions and unnecessary heap usage.
-    taskAttemptStatus.counters = taskStatus.getCounters();
-
-
-    // Map Finish time set by the task (map only)
-    // TODO CLEANMRXAM - maybe differentiate between map / reduce / types
-    if (taskStatus.getMapFinishTime() != 0) {
-      taskAttemptStatus.mapFinishTime = taskStatus.getMapFinishTime();
-    }
-
-    // Shuffle Finish time set by the task (reduce only).
-    if (taskStatus.getShuffleFinishTime() != 0) {
-      taskAttemptStatus.shuffleFinishTime = taskStatus.getShuffleFinishTime();
-    }
-
-    // Sort finish time set by the task (reduce only).
-    if (taskStatus.getSortFinishTime() != 0) {
-      taskAttemptStatus.sortFinishTime = taskStatus.getSortFinishTime();
-    }
-
-    // Not Setting the task state. Used by speculation - will be set in
-    // TaskAttemptImpl
-    // taskAttemptStatus.taskState =
-    // TypeConverter.toYarn(taskStatus.getRunState());
-
-    // set the fetch failures
-    if (taskStatus.getFailedDependencies() != null
-        && taskStatus.getFailedDependencies().size() > 0) {
-      LOG.warn("Failed dependencies are not handled at the moment." +
-      		" The job is likely to fail / hang");
-      taskAttemptStatus.fetchFailedMaps = new ArrayList<TezTaskAttemptID>();
-      for (TezTaskAttemptID failedAttemptId : taskStatus
-          .getFailedDependencies()) {
-        taskAttemptStatus.fetchFailedMaps.add(failedAttemptId);
-      }
-    }
-
-    // Task sends the information about the nextRecordRange to the TT
-
-    // TODO: The following are not needed here, but needed to be set somewhere
-    // inside AppMaster.
-    // taskStatus.getRunState(); // Set by the TT/JT. Transform into a state
-    // TODO
-    // taskStatus.getStartTime(); // Used to be set by the TaskTracker. This
-    // should be set by getTask().
-    // taskStatus.getFinishTime(); // Used to be set by TT/JT. Should be set
-    // when task finishes
-    // // This was used by TT to do counter updates only once every minute. So
-    // this
-    // // isn't ever changed by the Task itself.
-    // taskStatus.getIncludeCounters();
-
-    context.getEventHandler().handle(
-        new TaskAttemptEventStatusUpdate(taskAttemptStatus.id,
-            taskAttemptStatus));
-    return true;
-  }
-  */
-
   /**
    * Child checking whether it can commit.
    *

http://git-wip-us.apache.org/repos/asf/tez/blob/4fe8eae9/tez-mapreduce/src/main/java/org/apache/tez/common/TezTaskStatus.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/common/TezTaskStatus.java b/tez-mapreduce/src/main/java/org/apache/tez/common/TezTaskStatus.java
deleted file mode 100644
index 45ea80e..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/common/TezTaskStatus.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.common;
-
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-// TODO NEWTEZ Get rid of this.
-public interface TezTaskStatus extends Writable {
-
-  //enumeration for reporting current phase of a task.
-  @InterfaceAudience.Private
-  @InterfaceStability.Unstable
-  public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
-
-  // what state is the task in?
-  @InterfaceAudience.Private
-  @InterfaceStability.Unstable
-  public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
-                            COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
-
-  public abstract TezTaskAttemptID getTaskAttemptId();
-
-  public abstract float getProgress();
-
-  public abstract void setProgress(float progress);
-
-  public abstract State getRunState();
-
-  public abstract void setRunState(State runState);
-
-  public abstract String getDiagnosticInfo();
-
-  public abstract void setDiagnosticInfo(String info);
-
-  // TODOTEZDAG Remove stateString / rename
-  public abstract String getStateString();
-
-  public abstract void setStateString(String stateString);
-
-  public abstract long getFinishTime();
-
-  public abstract void setFinishTime(long finishTime);
-  
-  // TODOTEZDAG Can shuffle / merge be made generic ? Otherwise just a single finish time.
-  public abstract long getShuffleFinishTime();
-
-  public abstract void setShuffleFinishTime(long shuffleFinishTime);
-  
-  public abstract long getMapFinishTime();
-
-  public abstract void setMapFinishTime(long mapFinishTime);
-  
-  public abstract long getSortFinishTime();
-  
-  public abstract void setSortFinishTime(long sortFinishTime);
-  
-  public abstract long getStartTime();
-  
-  public abstract void setStartTime(long startTime);
-
-  // TODOTEZDAG Remove phase
-  public abstract Phase getPhase();
-
-  public abstract void setPhase(Phase phase);
-
-  public abstract TezCounters getCounters();
-
-  public abstract void setCounters(TezCounters counters);
-
-  public abstract List<TezTaskAttemptID> getFailedDependencies();
-
-  public abstract void addFailedDependency(TezTaskAttemptID taskAttempttId);
-
-  public abstract void clearStatus();
-
-  public abstract void statusUpdate(float f, String string, TezCounters counters);
-
-  // TODOTEZDAG maybe remove ?
-  public abstract long getLocalOutputSize();
-
-  public abstract void setOutputSize(long l);
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/4fe8eae9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java
deleted file mode 100644
index f04a759..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.TaskStatus;
-import org.apache.tez.common.TezTaskStatus;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public class MRTaskStatus implements TezTaskStatus {
-
-  static final Log LOG =
-      LogFactory.getLog(TaskStatus.class.getName());
-  // max task-status string size
-  static final int MAX_STRING_SIZE = 1024;
-
-  private TezTaskAttemptID taskAttemptId;
-  private State state = State.UNASSIGNED;
-  private float progress = 0.0f;
-  private String diagnostics = "";
-  private String userStatusInfo = "";
-  private Phase phase;
-  private TezCounters counters;
-  
-  private long localOutputSize;
-  List<TezTaskAttemptID> failedTaskDependencies = 
-      new ArrayList<TezTaskAttemptID>();
-  
-  private long startTime;
-  private long finishTime;
-  private long sortFinishTime;
-  private long mapFinishTime;
-  private long shuffleFinishTime;
-  
-  // For serialization.
-  public MRTaskStatus() {
-  }
-  
-  public MRTaskStatus(
-      TezTaskAttemptID taskAttemptId,  
-      TezCounters counters, Phase phase) {
-    this.taskAttemptId = taskAttemptId;
-    this.counters = counters;
-    this.phase = phase;
-  }
-  
-  @Override
-  public TezTaskAttemptID getTaskAttemptId() {
-    return taskAttemptId;
-  }
-
-  @Override
-  public float getProgress() {
-    return progress; 
-  }
-
-  @Override
-  public void setProgress(float progress) {
-    this.progress = progress;
-  }
-
-  @Override
-  public State getRunState() {
-    return state;
-  }
-
-  @Override
-  public void setRunState(State state) {
-    this.state = state;
-  }
-
-  @Override
-  public String getDiagnosticInfo() {
-    return diagnostics;
-  }
-
-  @Override
-  public void setDiagnosticInfo(String info) {
-    this.diagnostics = info;
-  }
-
-  @Override
-  public String getStateString() {
-    return userStatusInfo;
-  }
-
-  @Override
-  public void setStateString(String userStatusInfo) {
-    this.userStatusInfo = userStatusInfo;
-  }
-
-  @Override
-  public long getFinishTime() {
-    return finishTime;
-  }
-
-  @Override
-  public long getShuffleFinishTime() {
-    return shuffleFinishTime;
-  }
-
-  @Override
-  public long getMapFinishTime() {
-    return mapFinishTime;
-  }
-
-  @Override
-  public long getSortFinishTime() {
-    return sortFinishTime;
-  }
-
-  @Override
-  public long getStartTime() {
-    return startTime;
-  }
-
-  @Override
-  public Phase getPhase() {
-    return phase;
-  }
-
-  @Override
-  public void setPhase(Phase phase) {
-    Phase oldPhase = getPhase();
-    if (oldPhase != phase) {
-      // sort phase started
-      if (phase == Phase.SORT){
-        if (oldPhase == Phase.MAP) {
-          setMapFinishTime(System.currentTimeMillis());
-        } else {
-          setShuffleFinishTime(System.currentTimeMillis());
-        }
-      } else if (phase == Phase.REDUCE) {
-        setSortFinishTime(System.currentTimeMillis());
-      }
-      this.phase = phase;
-    }
-  }
-
-  @Override
-  public TezCounters getCounters() {
-    return counters;
-  }
-
-  @Override
-  public void setCounters(TezCounters counters) {
-    this.counters = counters;
-  }
-
-  @Override
-  public long getLocalOutputSize() {
-    return localOutputSize;
-  }
-
-  @Override
-  public List<TezTaskAttemptID> getFailedDependencies() {
-    return failedTaskDependencies;
-  }
-
-  @Override
-  public void addFailedDependency(TezTaskAttemptID taskAttemptId) {
-    failedTaskDependencies.add(taskAttemptId);
-  }
-
-  @Override
-  synchronized public void clearStatus() {
-    userStatusInfo = "";
-    failedTaskDependencies.clear();
-  }
-
-  @Override
-  synchronized public void statusUpdate(
-      float progress, String userDiagnosticInfo, TezCounters counters) {
-    setProgress(progress);
-    setDiagnosticInfo(userDiagnosticInfo);
-    setCounters(counters);
-  }
-
-  @Override
-  public void setOutputSize(long localOutputSize) {
-    this.localOutputSize = localOutputSize;
-  }
-
-  @Override
-  public void setFinishTime(long finishTime) {
-    if(this.getStartTime() > 0 && finishTime > 0) {
-      if (getShuffleFinishTime() == 0) {
-        setShuffleFinishTime(finishTime);
-      }
-      if (getSortFinishTime() == 0){
-        setSortFinishTime(finishTime);
-      }
-      if (getMapFinishTime() == 0) {
-        setMapFinishTime(finishTime);
-      }
-      this.finishTime = finishTime;
-    }
-  }
-
-  @Override
-  public void setShuffleFinishTime(long shuffleFinishTime) {
-    this.shuffleFinishTime = shuffleFinishTime;
-  }
-
-  @Override
-  public void setMapFinishTime(long mapFinishTime) {
-    this.mapFinishTime = mapFinishTime;
-  }
-
-  @Override
-  public void setSortFinishTime(long sortFinishTime) {
-    this.sortFinishTime = sortFinishTime;
-    if (getShuffleFinishTime() == this.shuffleFinishTime ){
-      setShuffleFinishTime(sortFinishTime);
-    }
-  }
-
-  @Override
-  public void setStartTime(long startTime) {
-    this.startTime = startTime;
-  }
-
-  
-  @Override
-  public void write(DataOutput out) throws IOException {
-    taskAttemptId.write(out);
-    WritableUtils.writeEnum(out, state);
-    out.writeFloat(progress);
-    WritableUtils.writeString(out, diagnostics);
-    WritableUtils.writeString(out, userStatusInfo);
-    WritableUtils.writeEnum(out, phase);
-
-    counters.write(out);
-    
-    out.writeLong(localOutputSize);
-    out.writeLong(startTime);
-    out.writeLong(finishTime);
-    out.writeLong(sortFinishTime);
-    out.writeLong(mapFinishTime);
-    out.writeLong(shuffleFinishTime);
-
-    out.writeInt(failedTaskDependencies.size());
-    for(TezTaskAttemptID taskAttemptId : failedTaskDependencies) {
-      taskAttemptId.write(out);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    taskAttemptId = TezTaskAttemptID.readTezTaskAttemptID(in);
-    state = WritableUtils.readEnum(in, State.class);
-    progress = in.readFloat();
-    diagnostics = WritableUtils.readString(in);
-    userStatusInfo = WritableUtils.readString(in);
-    phase = WritableUtils.readEnum(in, Phase.class);
-    counters = new TezCounters();
-    
-    counters.readFields(in);
-    
-    localOutputSize = in.readLong();
-    startTime = in.readLong();
-    finishTime = in.readLong();
-    sortFinishTime = in.readLong();
-    mapFinishTime = in.readLong();
-    shuffleFinishTime = in.readLong();
-    
-    int numFailedDependencies = in.readInt();
-    for (int i = 0 ; i < numFailedDependencies ; i++) {
-      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.readTezTaskAttemptID(in);
-      failedTaskDependencies.add(taskAttemptId);
-    }
-    
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/4fe8eae9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
index 889c64c..6f9c1c7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.tez.common.TezTaskStatus.Phase;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
@@ -44,12 +43,6 @@ public class TezTypeConverters {
 
   }
   
-  public static org.apache.hadoop.mapreduce.v2.api.records.Phase toYarn(
-      Phase phase) {
-    return org.apache.hadoop.mapreduce.v2.api.records.Phase.valueOf(phase
-        .name());
-  }
-
   public static TaskAttemptId toYarn(TezTaskAttemptID taskAttemptId) {
     TaskAttemptID mrTaskAttemptId = IDConverter
         .toMRTaskAttemptId(taskAttemptId);

http://git-wip-us.apache.org/repos/asf/tez/blob/4fe8eae9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 5c27ce2..929d348 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.tez.common.MRFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
-import org.apache.tez.common.TezTaskStatus.State;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
@@ -329,10 +328,6 @@ public abstract class MRTask extends AbstractLogicalIOProcessor {
     this.taskAttemptContext =
         new TaskAttemptContextImpl(jobConf, taskAttemptId, mrReporter);
 
-    if (getState() == State.UNASSIGNED) {
-      setState(State.RUNNING);
-    }
-
     localizeConfiguration(jobConf);
   }
 
@@ -340,16 +335,6 @@ public abstract class MRTask extends AbstractLogicalIOProcessor {
     return mrReporter;
   }
 
-  public void setState(State state) {
-    // TODO Auto-generated method stub
-
-  }
-
-  public State getState() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
   public TezCounters getCounters() { return counters; }
 
   public void setConf(JobConf jobConf) {