You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/01/08 00:51:12 UTC
tez git commit: TEZ-485. Get rid of TezTaskStatus. (sseth)
Repository: tez
Updated Branches:
refs/heads/master f4be19375 -> 4fe8eae91
TEZ-485. Get rid of TezTaskStatus. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4fe8eae9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4fe8eae9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4fe8eae9
Branch: refs/heads/master
Commit: 4fe8eae91cde0d0743681a0d38bce5faad64008f
Parents: f4be193
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Jan 7 15:50:46 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Jan 7 15:50:46 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dag/app/TaskAttemptListenerImpTezDag.java | 91 ------
.../org/apache/tez/common/TezTaskStatus.java | 105 -------
.../tez/mapreduce/hadoop/MRTaskStatus.java | 300 -------------------
.../tez/mapreduce/hadoop/TezTypeConverters.java | 7 -
.../apache/tez/mapreduce/processor/MRTask.java | 15 -
6 files changed, 1 insertion(+), 518 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4fe8eae9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b6b15ce..83c73fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-485. Get rid of TezTaskStatus.
TEZ-1899. Fix findbugs warnings in tez-common module.
TEZ-1898. Fix findbugs warnings in tez-api module.
TEZ-1906. Fix findbugs warnings in tez-yarn-timeline-history-with-acls.
http://git-wip-us.apache.org/repos/asf/tez/blob/4fe8eae9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index cf34e0e..53da741 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -247,97 +247,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
return task;
}
- /*
- @Override
- public boolean statusUpdate(TezTaskAttemptID taskAttemptId,
- TezTaskStatus taskStatus) throws IOException, InterruptedException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Status update from: " + taskAttemptId);
- }
- taskHeartbeatHandler.progressing(taskAttemptId);
- pingContainerHeartbeatHandler(taskAttemptId);
- TaskAttemptStatusOld taskAttemptStatus = new TaskAttemptStatusOld();
- taskAttemptStatus.id = taskAttemptId;
- // Task sends the updated progress to the TT.
- taskAttemptStatus.progress = taskStatus.getProgress();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Progress of TaskAttempt " + taskAttemptId + " is : "
- + taskStatus.getProgress());
- }
-
- // Task sends the updated state-string to the TT.
- taskAttemptStatus.stateString = taskStatus.getStateString();
-
- // Set the output-size when map-task finishes. Set by the task itself.
- // outputSize is never used.
- taskAttemptStatus.outputSize = taskStatus.getLocalOutputSize();
-
- // TODO Phase
- // Task sends the updated phase to the TT.
- //taskAttemptStatus.phase = MRxTypeConverters.toYarn(taskStatus.getPhase());
-
- // TODO MRXAM3 - AVoid the 10 layers of convresion.
- // Counters are updated by the task. Convert counters into new format as
- // that is the primary storage format inside the AM to avoid multiple
- // conversions and unnecessary heap usage.
- taskAttemptStatus.counters = taskStatus.getCounters();
-
-
- // Map Finish time set by the task (map only)
- // TODO CLEANMRXAM - maybe differentiate between map / reduce / types
- if (taskStatus.getMapFinishTime() != 0) {
- taskAttemptStatus.mapFinishTime = taskStatus.getMapFinishTime();
- }
-
- // Shuffle Finish time set by the task (reduce only).
- if (taskStatus.getShuffleFinishTime() != 0) {
- taskAttemptStatus.shuffleFinishTime = taskStatus.getShuffleFinishTime();
- }
-
- // Sort finish time set by the task (reduce only).
- if (taskStatus.getSortFinishTime() != 0) {
- taskAttemptStatus.sortFinishTime = taskStatus.getSortFinishTime();
- }
-
- // Not Setting the task state. Used by speculation - will be set in
- // TaskAttemptImpl
- // taskAttemptStatus.taskState =
- // TypeConverter.toYarn(taskStatus.getRunState());
-
- // set the fetch failures
- if (taskStatus.getFailedDependencies() != null
- && taskStatus.getFailedDependencies().size() > 0) {
- LOG.warn("Failed dependencies are not handled at the moment." +
- " The job is likely to fail / hang");
- taskAttemptStatus.fetchFailedMaps = new ArrayList<TezTaskAttemptID>();
- for (TezTaskAttemptID failedAttemptId : taskStatus
- .getFailedDependencies()) {
- taskAttemptStatus.fetchFailedMaps.add(failedAttemptId);
- }
- }
-
- // Task sends the information about the nextRecordRange to the TT
-
- // TODO: The following are not needed here, but needed to be set somewhere
- // inside AppMaster.
- // taskStatus.getRunState(); // Set by the TT/JT. Transform into a state
- // TODO
- // taskStatus.getStartTime(); // Used to be set by the TaskTracker. This
- // should be set by getTask().
- // taskStatus.getFinishTime(); // Used to be set by TT/JT. Should be set
- // when task finishes
- // // This was used by TT to do counter updates only once every minute. So
- // this
- // // isn't ever changed by the Task itself.
- // taskStatus.getIncludeCounters();
-
- context.getEventHandler().handle(
- new TaskAttemptEventStatusUpdate(taskAttemptStatus.id,
- taskAttemptStatus));
- return true;
- }
- */
-
/**
* Child checking whether it can commit.
*
http://git-wip-us.apache.org/repos/asf/tez/blob/4fe8eae9/tez-mapreduce/src/main/java/org/apache/tez/common/TezTaskStatus.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/common/TezTaskStatus.java b/tez-mapreduce/src/main/java/org/apache/tez/common/TezTaskStatus.java
deleted file mode 100644
index 45ea80e..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/common/TezTaskStatus.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.common;
-
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-// TODO NEWTEZ Get rid of this.
-public interface TezTaskStatus extends Writable {
-
- //enumeration for reporting current phase of a task.
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
-
- // what state is the task in?
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
- COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
-
- public abstract TezTaskAttemptID getTaskAttemptId();
-
- public abstract float getProgress();
-
- public abstract void setProgress(float progress);
-
- public abstract State getRunState();
-
- public abstract void setRunState(State runState);
-
- public abstract String getDiagnosticInfo();
-
- public abstract void setDiagnosticInfo(String info);
-
- // TODOTEZDAG Remove stateString / rename
- public abstract String getStateString();
-
- public abstract void setStateString(String stateString);
-
- public abstract long getFinishTime();
-
- public abstract void setFinishTime(long finishTime);
-
- // TODOTEZDAG Can shuffle / merge be made generic ? Otherwise just a single finish time.
- public abstract long getShuffleFinishTime();
-
- public abstract void setShuffleFinishTime(long shuffleFinishTime);
-
- public abstract long getMapFinishTime();
-
- public abstract void setMapFinishTime(long mapFinishTime);
-
- public abstract long getSortFinishTime();
-
- public abstract void setSortFinishTime(long sortFinishTime);
-
- public abstract long getStartTime();
-
- public abstract void setStartTime(long startTime);
-
- // TODOTEZDAG Remove phase
- public abstract Phase getPhase();
-
- public abstract void setPhase(Phase phase);
-
- public abstract TezCounters getCounters();
-
- public abstract void setCounters(TezCounters counters);
-
- public abstract List<TezTaskAttemptID> getFailedDependencies();
-
- public abstract void addFailedDependency(TezTaskAttemptID taskAttempttId);
-
- public abstract void clearStatus();
-
- public abstract void statusUpdate(float f, String string, TezCounters counters);
-
- // TODOTEZDAG maybe remove ?
- public abstract long getLocalOutputSize();
-
- public abstract void setOutputSize(long l);
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/4fe8eae9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java
deleted file mode 100644
index f04a759..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.TaskStatus;
-import org.apache.tez.common.TezTaskStatus;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public class MRTaskStatus implements TezTaskStatus {
-
- static final Log LOG =
- LogFactory.getLog(TaskStatus.class.getName());
- // max task-status string size
- static final int MAX_STRING_SIZE = 1024;
-
- private TezTaskAttemptID taskAttemptId;
- private State state = State.UNASSIGNED;
- private float progress = 0.0f;
- private String diagnostics = "";
- private String userStatusInfo = "";
- private Phase phase;
- private TezCounters counters;
-
- private long localOutputSize;
- List<TezTaskAttemptID> failedTaskDependencies =
- new ArrayList<TezTaskAttemptID>();
-
- private long startTime;
- private long finishTime;
- private long sortFinishTime;
- private long mapFinishTime;
- private long shuffleFinishTime;
-
- // For serialization.
- public MRTaskStatus() {
- }
-
- public MRTaskStatus(
- TezTaskAttemptID taskAttemptId,
- TezCounters counters, Phase phase) {
- this.taskAttemptId = taskAttemptId;
- this.counters = counters;
- this.phase = phase;
- }
-
- @Override
- public TezTaskAttemptID getTaskAttemptId() {
- return taskAttemptId;
- }
-
- @Override
- public float getProgress() {
- return progress;
- }
-
- @Override
- public void setProgress(float progress) {
- this.progress = progress;
- }
-
- @Override
- public State getRunState() {
- return state;
- }
-
- @Override
- public void setRunState(State state) {
- this.state = state;
- }
-
- @Override
- public String getDiagnosticInfo() {
- return diagnostics;
- }
-
- @Override
- public void setDiagnosticInfo(String info) {
- this.diagnostics = info;
- }
-
- @Override
- public String getStateString() {
- return userStatusInfo;
- }
-
- @Override
- public void setStateString(String userStatusInfo) {
- this.userStatusInfo = userStatusInfo;
- }
-
- @Override
- public long getFinishTime() {
- return finishTime;
- }
-
- @Override
- public long getShuffleFinishTime() {
- return shuffleFinishTime;
- }
-
- @Override
- public long getMapFinishTime() {
- return mapFinishTime;
- }
-
- @Override
- public long getSortFinishTime() {
- return sortFinishTime;
- }
-
- @Override
- public long getStartTime() {
- return startTime;
- }
-
- @Override
- public Phase getPhase() {
- return phase;
- }
-
- @Override
- public void setPhase(Phase phase) {
- Phase oldPhase = getPhase();
- if (oldPhase != phase) {
- // sort phase started
- if (phase == Phase.SORT){
- if (oldPhase == Phase.MAP) {
- setMapFinishTime(System.currentTimeMillis());
- } else {
- setShuffleFinishTime(System.currentTimeMillis());
- }
- } else if (phase == Phase.REDUCE) {
- setSortFinishTime(System.currentTimeMillis());
- }
- this.phase = phase;
- }
- }
-
- @Override
- public TezCounters getCounters() {
- return counters;
- }
-
- @Override
- public void setCounters(TezCounters counters) {
- this.counters = counters;
- }
-
- @Override
- public long getLocalOutputSize() {
- return localOutputSize;
- }
-
- @Override
- public List<TezTaskAttemptID> getFailedDependencies() {
- return failedTaskDependencies;
- }
-
- @Override
- public void addFailedDependency(TezTaskAttemptID taskAttemptId) {
- failedTaskDependencies.add(taskAttemptId);
- }
-
- @Override
- synchronized public void clearStatus() {
- userStatusInfo = "";
- failedTaskDependencies.clear();
- }
-
- @Override
- synchronized public void statusUpdate(
- float progress, String userDiagnosticInfo, TezCounters counters) {
- setProgress(progress);
- setDiagnosticInfo(userDiagnosticInfo);
- setCounters(counters);
- }
-
- @Override
- public void setOutputSize(long localOutputSize) {
- this.localOutputSize = localOutputSize;
- }
-
- @Override
- public void setFinishTime(long finishTime) {
- if(this.getStartTime() > 0 && finishTime > 0) {
- if (getShuffleFinishTime() == 0) {
- setShuffleFinishTime(finishTime);
- }
- if (getSortFinishTime() == 0){
- setSortFinishTime(finishTime);
- }
- if (getMapFinishTime() == 0) {
- setMapFinishTime(finishTime);
- }
- this.finishTime = finishTime;
- }
- }
-
- @Override
- public void setShuffleFinishTime(long shuffleFinishTime) {
- this.shuffleFinishTime = shuffleFinishTime;
- }
-
- @Override
- public void setMapFinishTime(long mapFinishTime) {
- this.mapFinishTime = mapFinishTime;
- }
-
- @Override
- public void setSortFinishTime(long sortFinishTime) {
- this.sortFinishTime = sortFinishTime;
- if (getShuffleFinishTime() == this.shuffleFinishTime ){
- setShuffleFinishTime(sortFinishTime);
- }
- }
-
- @Override
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
-
- @Override
- public void write(DataOutput out) throws IOException {
- taskAttemptId.write(out);
- WritableUtils.writeEnum(out, state);
- out.writeFloat(progress);
- WritableUtils.writeString(out, diagnostics);
- WritableUtils.writeString(out, userStatusInfo);
- WritableUtils.writeEnum(out, phase);
-
- counters.write(out);
-
- out.writeLong(localOutputSize);
- out.writeLong(startTime);
- out.writeLong(finishTime);
- out.writeLong(sortFinishTime);
- out.writeLong(mapFinishTime);
- out.writeLong(shuffleFinishTime);
-
- out.writeInt(failedTaskDependencies.size());
- for(TezTaskAttemptID taskAttemptId : failedTaskDependencies) {
- taskAttemptId.write(out);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- taskAttemptId = TezTaskAttemptID.readTezTaskAttemptID(in);
- state = WritableUtils.readEnum(in, State.class);
- progress = in.readFloat();
- diagnostics = WritableUtils.readString(in);
- userStatusInfo = WritableUtils.readString(in);
- phase = WritableUtils.readEnum(in, Phase.class);
- counters = new TezCounters();
-
- counters.readFields(in);
-
- localOutputSize = in.readLong();
- startTime = in.readLong();
- finishTime = in.readLong();
- sortFinishTime = in.readLong();
- mapFinishTime = in.readLong();
- shuffleFinishTime = in.readLong();
-
- int numFailedDependencies = in.readInt();
- for (int i = 0 ; i < numFailedDependencies ; i++) {
- TezTaskAttemptID taskAttemptId = TezTaskAttemptID.readTezTaskAttemptID(in);
- failedTaskDependencies.add(taskAttemptId);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/4fe8eae9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
index 889c64c..6f9c1c7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.tez.common.TezTaskStatus.Phase;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
@@ -44,12 +43,6 @@ public class TezTypeConverters {
}
- public static org.apache.hadoop.mapreduce.v2.api.records.Phase toYarn(
- Phase phase) {
- return org.apache.hadoop.mapreduce.v2.api.records.Phase.valueOf(phase
- .name());
- }
-
public static TaskAttemptId toYarn(TezTaskAttemptID taskAttemptId) {
TaskAttemptID mrTaskAttemptId = IDConverter
.toMRTaskAttemptId(taskAttemptId);
http://git-wip-us.apache.org/repos/asf/tez/blob/4fe8eae9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 5c27ce2..929d348 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.tez.common.MRFrameworkConfigs;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
-import org.apache.tez.common.TezTaskStatus.State;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
@@ -329,10 +328,6 @@ public abstract class MRTask extends AbstractLogicalIOProcessor {
this.taskAttemptContext =
new TaskAttemptContextImpl(jobConf, taskAttemptId, mrReporter);
- if (getState() == State.UNASSIGNED) {
- setState(State.RUNNING);
- }
-
localizeConfiguration(jobConf);
}
@@ -340,16 +335,6 @@ public abstract class MRTask extends AbstractLogicalIOProcessor {
return mrReporter;
}
- public void setState(State state) {
- // TODO Auto-generated method stub
-
- }
-
- public State getState() {
- // TODO Auto-generated method stub
- return null;
- }
-
public TezCounters getCounters() { return counters; }
public void setConf(JobConf jobConf) {