You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC

svn commit: r1457129 [14/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskStatus.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,299 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.records.TezTaskAttemptID;
+
+public class MRTaskStatus implements TezTaskStatus {
+
+  static final Log LOG =
+      LogFactory.getLog(TaskStatus.class.getName());
+  // max task-status string size
+  static final int MAX_STRING_SIZE = 1024;
+
+  private TezTaskAttemptID taskAttemptId;
+  private State state = State.UNASSIGNED;
+  private float progress = 0.0f;
+  private String diagnostics = "";
+  private String userStatusInfo = "";
+  private Phase phase;
+  private TezCounters counters;
+  
+  private long localOutputSize;
+  List<TezTaskAttemptID> failedTaskDependencies = 
+      new ArrayList<TezTaskAttemptID>();
+  
+  private long startTime;
+  private long finishTime;
+  private long sortFinishTime;
+  private long mapFinishTime;
+  private long shuffleFinishTime;
+  
+  // For serialization.
+  public MRTaskStatus() {
+  }
+  
+  public MRTaskStatus(
+      TezTaskAttemptID taskAttemptId,  
+      TezCounters counters, Phase phase) {
+    this.taskAttemptId = taskAttemptId;
+    this.counters = counters;
+    this.phase = phase;
+  }
+  
+  @Override
+  public TezTaskAttemptID getTaskAttemptId() {
+    return taskAttemptId;
+  }
+
+  @Override
+  public float getProgress() {
+    return progress; 
+  }
+
+  @Override
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  @Override
+  public State getRunState() {
+    return state;
+  }
+
+  @Override
+  public void setRunState(State state) {
+    this.state = state;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return diagnostics;
+  }
+
+  @Override
+  public void setDiagnosticInfo(String info) {
+    this.diagnostics = info;
+  }
+
+  @Override
+  public String getStateString() {
+    return userStatusInfo;
+  }
+
+  @Override
+  public void setStateString(String userStatusInfo) {
+    this.userStatusInfo = userStatusInfo;
+  }
+
+  @Override
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  @Override
+  public long getShuffleFinishTime() {
+    return shuffleFinishTime;
+  }
+
+  @Override
+  public long getMapFinishTime() {
+    return mapFinishTime;
+  }
+
+  @Override
+  public long getSortFinishTime() {
+    return sortFinishTime;
+  }
+
+  @Override
+  public long getStartTime() {
+    return startTime;
+  }
+
+  @Override
+  public Phase getPhase() {
+    return phase;
+  }
+
+  @Override
+  public void setPhase(Phase phase) {
+    Phase oldPhase = getPhase();
+    if (oldPhase != phase) {
+      // sort phase started
+      if (phase == Phase.SORT){
+        if (oldPhase == Phase.MAP) {
+          setMapFinishTime(System.currentTimeMillis());
+        } else {
+          setShuffleFinishTime(System.currentTimeMillis());
+        }
+      } else if (phase == Phase.REDUCE) {
+        setSortFinishTime(System.currentTimeMillis());
+      }
+      this.phase = phase;
+    }
+  }
+
+  @Override
+  public TezCounters getCounters() {
+    return counters;
+  }
+
+  @Override
+  public void setCounters(TezCounters counters) {
+    this.counters = counters;
+  }
+
+  @Override
+  public long getLocalOutputSize() {
+    return localOutputSize;
+  }
+
+  @Override
+  public List<TezTaskAttemptID> getFailedDependencies() {
+    return failedTaskDependencies;
+  }
+
+  @Override
+  public void addFailedDependency(TezTaskAttemptID taskAttemptId) {
+    failedTaskDependencies.add(taskAttemptId);
+  }
+
+  @Override
+  synchronized public void clearStatus() {
+    userStatusInfo = "";
+    failedTaskDependencies.clear();
+  }
+
+  @Override
+  synchronized public void statusUpdate(
+      float progress, String userDiagnosticInfo, TezCounters counters) {
+    setProgress(progress);
+    setDiagnosticInfo(userDiagnosticInfo);
+    setCounters(counters);
+  }
+
+  @Override
+  public void setOutputSize(long localOutputSize) {
+    this.localOutputSize = localOutputSize;
+  }
+
+  @Override
+  public void setFinishTime(long finishTime) {
+    if(this.getStartTime() > 0 && finishTime > 0) {
+      if (getShuffleFinishTime() == 0) {
+        setShuffleFinishTime(finishTime);
+      }
+      if (getSortFinishTime() == 0){
+        setSortFinishTime(finishTime);
+      }
+      if (getMapFinishTime() == 0) {
+        setMapFinishTime(finishTime);
+      }
+      this.finishTime = finishTime;
+    }
+  }
+
+  @Override
+  public void setShuffleFinishTime(long shuffleFinishTime) {
+    this.shuffleFinishTime = shuffleFinishTime;
+  }
+
+  @Override
+  public void setMapFinishTime(long mapFinishTime) {
+    this.mapFinishTime = mapFinishTime;
+  }
+
+  @Override
+  public void setSortFinishTime(long sortFinishTime) {
+    this.sortFinishTime = sortFinishTime;
+    if (getShuffleFinishTime() == this.shuffleFinishTime ){
+      setShuffleFinishTime(sortFinishTime);
+    }
+  }
+
+  @Override
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskAttemptId.write(out);
+    WritableUtils.writeEnum(out, state);
+    out.writeFloat(progress);
+    WritableUtils.writeString(out, diagnostics);
+    WritableUtils.writeString(out, userStatusInfo);
+    WritableUtils.writeEnum(out, phase);
+
+    counters.write(out);
+    
+    out.writeLong(localOutputSize);
+    out.writeLong(startTime);
+    out.writeLong(finishTime);
+    out.writeLong(sortFinishTime);
+    out.writeLong(mapFinishTime);
+    out.writeLong(shuffleFinishTime);
+
+    out.writeInt(failedTaskDependencies.size());
+    for(TezTaskAttemptID taskAttemptId : failedTaskDependencies) {
+      taskAttemptId.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskAttemptId = TezTaskAttemptID.read(in);
+    state = WritableUtils.readEnum(in, State.class);
+    progress = in.readFloat();
+    diagnostics = WritableUtils.readString(in);
+    userStatusInfo = WritableUtils.readString(in);
+    phase = WritableUtils.readEnum(in, Phase.class);
+    counters = new TezCounters();
+    
+    counters.readFields(in);
+    
+    localOutputSize = in.readLong();
+    startTime = in.readLong();
+    finishTime = in.readLong();
+    sortFinishTime = in.readLong();
+    mapFinishTime = in.readLong();
+    shuffleFinishTime = in.readLong();
+    
+    int numFailedDependencies = in.readInt();
+    for (int i = 0 ; i < numFailedDependencies ; i++) {
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.read(in);
+      failedTaskDependencies.add(taskAttemptId);
+    }
+    
+  }
+
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRTaskType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Enum for map, reduce, job-setup, job-cleanup, task-cleanup task types.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public enum MRTaskType {
+
+  MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP;
+
+  public String toString() {
+    switch (this) {
+      case MAP:
+        return "m";
+      case REDUCE:
+        return "r";
+      default:
+        return this.name();
+    }
+  }
+
+  public static MRTaskType fromString(String taskTypeString) {
+    if (taskTypeString.equals("m") || taskTypeString.equals(MRTaskType.MAP.toString())) {
+      return MRTaskType.MAP;
+    } else if (taskTypeString.equals("r") || taskTypeString.equals(MRTaskType.REDUCE.toString())) {
+      return MRTaskType.REDUCE;
+    } else {
+      return MRTaskType.valueOf(taskTypeString);
+    }
+  }
+  
+  public String toSerializedString() {
+    return this.name();
+  }
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTaskUmbilicalProtocol.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.api.Master;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.OutputContext;
+
+/** Protocol that task child process uses to contact its parent process.  The
+ * parent is a daemon which which polls the central master for a new map or
+ * reduce task and runs it as a child process.  All communication between child
+ * and parent is via this protocol. */ 
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public interface TezTaskUmbilicalProtocol extends Master {
+
+  public static final long versionID = 19L;
+
+  ContainerTask getTask(ContainerContext containerContext) throws IOException;
+  
+  boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus) 
+  throws IOException, InterruptedException;
+  
+  void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace) throws IOException;
+  
+  boolean ping(TezTaskAttemptID taskid) throws IOException;
+
+  void done(TezTaskAttemptID taskid) throws IOException;
+  
+  void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus) 
+  throws IOException, InterruptedException;  
+
+  boolean canCommit(TezTaskAttemptID taskid) throws IOException;
+
+  void shuffleError(TezTaskAttemptID taskId, String message) throws IOException;
+  
+  void fsError(TezTaskAttemptID taskId, String message) throws IOException;
+
+  void fatalError(TezTaskAttemptID taskId, String message) throws IOException;
+  
+  // TODO TEZAM5 Can commitPending and outputReady be collapsed into a single
+  // call.
+  // IAC outputReady followed by commit is a little confusing - since the output
+  // isn't really in place till a commit is called. Maybe rename to
+  // processingComplete or some such.
+  
+  // TODO EVENTUALLY This is not the most useful API. Once there's some kind of
+  // support for the Task handing output over to the Container, this won't rally
+  // be required. i.e. InMemShuffle running as a service in the Container, or
+  // the second task in getTask(). ContainerUmbilical would include getTask and
+  // getServices...
+  
+  void outputReady(TezTaskAttemptID taskAttemptId, OutputContext outputContext)
+      throws IOException;
+  
+  ProceedToCompletionResponse
+      proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException;
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.tez.common.TezTaskStatus.Phase;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.records.TezTaskAttemptID;
+
+public class TezTypeConverters {
+
+  // Tez objects will be imported. Others will use the fully qualified name when
+  // required.
+  // All public methods named toYarn / toTez / toMapReduce
+
+  public static org.apache.hadoop.mapreduce.v2.api.records.Phase toYarn(
+      Phase phase) {
+    return org.apache.hadoop.mapreduce.v2.api.records.Phase.valueOf(phase
+        .name());
+  }
+
+  public static TaskAttemptId toYarn(TezTaskAttemptID taskAttemptId) {
+    TaskAttemptID mrTaskAttemptId = IDConverter
+        .toMRTaskAttemptId(taskAttemptId);
+    TaskAttemptId mrv2TaskAttemptId = TypeConverter.toYarn(mrTaskAttemptId);
+    return mrv2TaskAttemptId;
+  }
+
+  public static TezTaskAttemptID toTez(TaskAttemptId taskAttemptId) {
+    TaskAttemptID mrTaskAttemptId = TypeConverter.fromYarn(taskAttemptId);
+    TezTaskAttemptID tezTaskAttemptId = IDConverter
+        .fromMRTaskAttemptId(mrTaskAttemptId);
+    return tezTaskAttemptId;
+  }
+
+  public static TezDependentTaskCompletionEvent.Status toTez(
+      TaskAttemptCompletionEventStatus status) {
+    return TezDependentTaskCompletionEvent.Status.valueOf(status.toString());
+  }
+
+  public static TezDependentTaskCompletionEvent toTez(
+      TaskAttemptCompletionEvent event) {
+    return new TezDependentTaskCompletionEvent(event.getEventId(),
+        toTez(event.getAttemptId()), event.getAttemptId().getTaskId()
+            .getTaskType() == TaskType.MAP, toTez(event.getStatus()),
+        event.getMapOutputServerAddress());
+  }
+  
+  public static Counters fromTez(TezCounters tezCounters) {
+    if (tezCounters == null) {
+      return null;
+    }
+    Counters counters = new Counters();
+    for (CounterGroup xGrp : tezCounters) {
+      counters.addGroup(xGrp.getName(), xGrp.getDisplayName());
+      for (TezCounter xCounter : xGrp) {
+        Counter counter =
+            counters.findCounter(xGrp.getName(), xCounter.getName());
+        counter.setValue(xCounter.getValue());
+
+      }
+    }
+    return counters;
+  }
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/JobContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,65 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.records.TezJobID;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobContextImpl 
+    extends org.apache.hadoop.mapreduce.task.JobContextImpl 
+    implements JobContext {
+  private JobConf job;
+  private Progressable progress;
+
+  public JobContextImpl(JobConf conf, TezJobID jobId, 
+                 Progressable progress) {
+    super(conf, IDConverter.toMRJobId(jobId));
+    this.job = conf;
+    this.progress = progress;
+  }
+
+  public JobContextImpl(JobConf conf, TezJobID jobId) {
+    this(conf, jobId, Reporter.NULL);
+  }
+  
+  /**
+   * Get the job Configuration
+   * 
+   * @return JobConf
+   */
+  public JobConf getJobConf() {
+    return job;
+  }
+  
+  /**
+   * Get the progress mechanism for reporting progress.
+   * 
+   * @return progress mechanism 
+   */
+  public Progressable getProgressible() {
+    return progress;
+  }
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRCounters.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,312 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.logging.Log;
+
+public class MRCounters extends org.apache.hadoop.mapred.Counters {
+  private final org.apache.tez.common.counters.TezCounters raw;
+  
+  public MRCounters(org.apache.tez.common.counters.TezCounters raw) {
+    this.raw = raw;
+  }
+
+  @Override
+  public synchronized org.apache.hadoop.mapred.Counters.Group getGroup(String groupName) {
+    return new MRCounterGroup(raw.getGroup(groupName));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public synchronized Collection<String> getGroupNames() {
+    return IteratorUtils.toList(raw.getGroupNames().iterator());  }
+
+  @Override
+  public synchronized String makeCompactString() {
+    StringBuilder builder = new StringBuilder();
+    boolean first = true;
+    for(Group group: this){
+      for(Counter counter: group) {
+        if (first) {
+          first = false;
+        } else {
+          builder.append(',');
+        }
+        builder.append(group.getDisplayName());
+        builder.append('.');
+        builder.append(counter.getDisplayName());
+        builder.append(':');
+        builder.append(counter.getCounter());
+      }
+    }
+    return builder.toString();
+  }
+
+  @Override
+  public synchronized Counter findCounter(String group, String name) {
+    return new MRCounter(raw.findCounter(group, name));
+  }
+
+  @Override
+  public Counter findCounter(String group, int id, String name) {
+    return new MRCounter(raw.findCounter(group, name));
+  }
+
+  @Override
+  public void incrCounter(Enum<?> key, long amount) {
+    raw.findCounter(key).increment(amount);
+  }
+
+  @Override
+  public void incrCounter(String group, String counter, long amount) {
+    raw.findCounter(group, counter).increment(amount);
+  }
+
+  @Override
+  public synchronized long getCounter(Enum<?> key) {
+    return raw.findCounter(key).getValue();
+  }
+
+  @Override
+  public synchronized void incrAllCounters(
+      org.apache.hadoop.mapred.Counters other) {
+    for (Group otherGroup: other) {
+      Group group = getGroup(otherGroup.getName());
+      group.setDisplayName(otherGroup.getDisplayName());
+      for (Counter otherCounter : otherGroup) {
+        Counter counter = group.getCounterForName(otherCounter.getName());
+        counter.setDisplayName(otherCounter.getDisplayName());
+        counter.increment(otherCounter.getValue());
+      }
+    }
+  }
+  
+  @Override
+  public int size() {
+    return countCounters();
+  }
+
+  @Override
+  public void log(Log log) {
+    log.info("Counters: " + size());
+    for(Group group: this) {
+      log.info("  " + group.getDisplayName());
+      for (Counter counter: group) {
+        log.info("    " + counter.getDisplayName() + "=" +
+                 counter.getCounter());
+      }
+    }
+  }
+
+  @Override
+  public String makeEscapedCompactString() {
+    return toEscapedCompactString(this);
+  }
+  
+  public static class MRCounterGroup extends org.apache.hadoop.mapred.Counters.Group {
+    private final org.apache.tez.common.counters.CounterGroup group;
+    public MRCounterGroup(org.apache.tez.common.counters.CounterGroup group) {
+      this.group = group;
+    }
+    @Override
+    public String getName() {
+      return group.getName();
+    }
+    @Override
+    public String getDisplayName() {
+      return group.getDisplayName();
+    }
+    @Override
+    public void setDisplayName(String displayName) {
+      group.setDisplayName(displayName);
+    }
+    @Override
+    public void addCounter(org.apache.hadoop.mapred.Counters.Counter counter) {
+      group.addCounter(convert(counter));
+    }
+    @Override
+    public org.apache.hadoop.mapred.Counters.Counter addCounter(String name,
+        String displayName, long value) {
+      return new MRCounter(group.addCounter(name, displayName, value));
+    }
+    @Override
+    public org.apache.hadoop.mapred.Counters.Counter findCounter(
+        String counterName, String displayName) {
+      return new MRCounter(group.findCounter(counterName, displayName));
+    }
+    @Override
+    public int size() {
+      return group.size();
+    }
+    @Override
+    public void incrAllCounters(
+        org.apache.hadoop.mapreduce.counters.CounterGroupBase rightGroup) {
+      new MRCounterGroup(group).incrAllCounters(rightGroup);
+    }
+    @Override
+    public org.apache.hadoop.mapreduce.counters.CounterGroupBase 
+    getUnderlyingGroup() {
+      return new MRCounterGroup(group).getUnderlyingGroup();
+    }
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+    }
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+    }
+    @Override
+    public Iterator iterator() {
+      // FIXME?
+      return group.iterator();
+    }
+  }
+  
+  public static class MRCounter extends Counter {
+    private final org.apache.tez.common.counters.TezCounter raw;
+    
+    public MRCounter(org.apache.tez.common.counters.TezCounter raw) {
+      this.raw = raw;
+    }
+
+    @Override
+    public void setDisplayName(String displayName) {
+      // TODO Auto-generated method stub
+      raw.setDisplayName(displayName);
+    }
+
+    @Override
+    public String getName() {
+      return raw.getName();
+    }
+
+    @Override
+    public String getDisplayName() {
+      return raw.getDisplayName();
+    }
+
+    @Override
+    public long getValue() {
+      return raw.getValue();
+    }
+
+    @Override
+    public void setValue(long value) {
+      raw.setValue(value);
+    }
+
+    @Override
+    public void increment(long incr) {
+      raw.increment(incr);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      raw.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      raw.readFields(in);
+    }
+
+    @Override
+    public String makeEscapedCompactString() {
+      return toEscapedCompactString(new MRCounter(raw));
+    }
+
+    @Deprecated
+    public boolean contentEquals(Counter counter) {
+      MRCounter c = new MRCounter(raw);
+      return c.equals(counter.getUnderlyingCounter());
+    }
+
+
+    @Override
+    public long getCounter() {
+      return raw.getValue();
+    }
+
+    @Override
+    public org.apache.hadoop.mapreduce.Counter getUnderlyingCounter() {
+      return new MRCounter(raw).getUnderlyingCounter();
+    }
+
+    @Override
+    public synchronized boolean equals(Object genericRight) {
+      return raw.equals(genericRight);
+    }
+
+    @Override
+    public int hashCode() {
+      // TODO Auto-generated method stub
+      return raw.hashCode();
+    }
+  }
+  
+  static org.apache.tez.common.counters.TezCounter convert(
+      org.apache.hadoop.mapred.Counters.Counter counter) {
+    org.apache.hadoop.mapreduce.Counter underlyingCounter =
+        counter.getUnderlyingCounter();
+    if (underlyingCounter instanceof org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter) {
+      org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter 
+      real = 
+      (org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter)underlyingCounter;
+      return new org.apache.tez.common.counters.FrameworkCounterGroup.FrameworkCounter(
+          real.getKey(), real.getGroupName());
+    } else if (underlyingCounter instanceof org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup.FSCounter) {
+      org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup.FSCounter real = 
+          (org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup.FSCounter)underlyingCounter;
+      return new org.apache.tez.common.counters.FileSystemCounterGroup.FSCounter(
+          real.getScheme(), convert(real.getFileSystemCounter()));
+    } else {
+      return new org.apache.tez.common.counters.GenericCounter(
+          underlyingCounter.getName(), 
+          underlyingCounter.getDisplayName(), 
+          underlyingCounter.getValue());
+    }
+  }
+  
+  static org.apache.tez.common.counters.FileSystemCounter convert(
+      org.apache.hadoop.mapreduce.FileSystemCounter c) {
+    switch (c) {
+      case BYTES_READ:
+        return org.apache.tez.common.counters.FileSystemCounter.BYTES_READ;
+      case BYTES_WRITTEN:
+        return org.apache.tez.common.counters.FileSystemCounter.BYTES_WRITTEN;
+      case READ_OPS:
+        return org.apache.tez.common.counters.FileSystemCounter.READ_OPS;
+      case LARGE_READ_OPS:
+        return org.apache.tez.common.counters.FileSystemCounter.LARGE_READ_OPS;
+      case WRITE_OPS:
+        return org.apache.tez.common.counters.FileSystemCounter.WRITE_OPS;
+      default:
+        throw new IllegalArgumentException("Unknow FileSystemCounter: " + c);
+    }
+    
+  }
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/TaskAttemptContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,107 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.records.TezTaskAttemptID;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TaskAttemptContextImpl
+       extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
+       implements TaskAttemptContext {
+  private MRTaskReporter reporter;
+
+  public TaskAttemptContextImpl(JobConf conf, TezTaskAttemptID taskid) {
+    this(conf, taskid, null);
+  }
+  
+  @SuppressWarnings("deprecation")
+  public TaskAttemptContextImpl(JobConf conf, TezTaskAttemptID taskAttemptId,
+                         MRTaskReporter reporter) {
+    super(conf, 
+        new TaskAttemptID(
+            new TaskID(
+                taskAttemptId.getJobID().getJtIdentifier(), 
+                taskAttemptId.getJobID().getId(), 
+                taskAttemptId.getTaskType().equals(MRTaskType.MAP.toString()), 
+                taskAttemptId.getTaskID().getId()), 
+              taskAttemptId.getId()));
+    this.reporter = reporter;
+  }
+  
+  /**
+   * Get the taskAttemptID.
+   *  
+   * @return TaskAttemptID
+   */
+  public TaskAttemptID getTaskAttemptID() {
+    return (TaskAttemptID) super.getTaskAttemptID();
+  }
+  
+  public Progressable getProgressible() {
+    return reporter;
+  }
+  
+  public JobConf getJobConf() {
+    return (JobConf) getConfiguration();
+  }
+  
+  @Override
+  public float getProgress() {
+    return reporter.getProgress();
+  }
+
+  @Override
+  public Counter getCounter(Enum<?> counterName) {
+    return (Counter) reporter.getCounter(counterName);
+  }
+
+  @Override
+  public Counter getCounter(String groupName, String counterName) {
+    return (Counter) reporter.getCounter(groupName, counterName);
+  }
+
+  /**
+   * Report progress.
+   */
+  @Override
+  public void progress() {
+    reporter.progress();
+  }
+
+  /**
+   * Set the current status of the task to the given string.
+   */
+  @Override
+  public void setStatus(String status) {
+    setStatusString(status);
+    reporter.setStatus(status);
+  }
+
+
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/JobContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.records.TezJobID;
+
+/**
+ * A read-only view of the job that is provided to the tasks while they
+ * are running.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobContextImpl implements JobContext {
+
+  protected final org.apache.hadoop.mapred.JobConf conf;
+  private TezJobID jobId;
+  /**
+   * The UserGroupInformation object that has a reference to the current user
+   */
+  protected UserGroupInformation ugi;
+  protected final Credentials credentials;
+  private Progressable progress;
+
+  public JobContextImpl(Configuration conf, TezJobID jobId) {
+    this(conf, jobId, MRTaskReporter.NULL);
+  }
+  
+  public JobContextImpl(Configuration conf, TezJobID jobId, Progressable progress) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf)conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+    this.jobId = jobId;
+    this.credentials = this.conf.getCredentials();
+    try {
+      this.ugi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.progress = progress;
+  }
+
+  /**
+   * Return the configuration for the job.
+   * @return the shared configuration object
+   */
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
+   * Get the unique ID for the job.
+   * @return the object with the job id
+   */
+  public JobID getJobID() {
+    return IDConverter.toMRJobId(jobId);
+  }
+  
+  /**
+   * Set the JobID.
+   */
+  public void setJobID(JobID jobId) {
+    this.jobId = IDConverter.fromMRJobId(jobId);
+  }
+  
+  /**
+   * Get configured the number of reduce tasks for this job. Defaults to 
+   * <code>1</code>.
+   * @return the number of reduce tasks for this job.
+   */
+  public int getNumReduceTasks() {
+    return conf.getNumReduceTasks();
+  }
+  
+  /**
+   * Get the current working directory for the default file system.
+   * 
+   * @return the directory name.
+   */
+  public Path getWorkingDirectory() throws IOException {
+    return conf.getWorkingDirectory();
+  }
+
+  /**
+   * Get the key class for the job output data.
+   * @return the key class for the job output data.
+   */
+  public Class<?> getOutputKeyClass() {
+    return conf.getOutputKeyClass();
+  }
+  
+  /**
+   * Get the value class for job outputs.
+   * @return the value class for job outputs.
+   */
+  public Class<?> getOutputValueClass() {
+    return conf.getOutputValueClass();
+  }
+
+  /**
+   * Get the key class for the map output data. If it is not set, use the
+   * (final) output key class. This allows the map output key class to be
+   * different than the final output key class.
+   * @return the map output key class.
+   */
+  public Class<?> getMapOutputKeyClass() {
+    return conf.getMapOutputKeyClass();
+  }
+
+  /**
+   * Get the value class for the map output data. If it is not set, use the
+   * (final) output value class This allows the map output value class to be
+   * different than the final output value class.
+   *  
+   * @return the map output value class.
+   */
+  public Class<?> getMapOutputValueClass() {
+    return conf.getMapOutputValueClass();
+  }
+
+  /**
+   * Get the user-specified job name. This is only used to identify the 
+   * job to the user.
+   * 
+   * @return the job's name, defaulting to "".
+   */
+  public String getJobName() {
+    return conf.getJobName();
+  }
+
+  /**
+   * Get the {@link InputFormat} class for the job.
+   * 
+   * @return the {@link InputFormat} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends InputFormat<?,?>> getInputFormatClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends InputFormat<?,?>>) 
+      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
+  }
+
+  /**
+   * Get the {@link Mapper} class for the job.
+   * 
+   * @return the {@link Mapper} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Mapper<?,?,?,?>> getMapperClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Mapper<?,?,?,?>>) 
+      conf.getClass(MAP_CLASS_ATTR, Mapper.class);
+  }
+
+  /**
+   * Get the combiner class for the job.
+   * 
+   * @return the combiner class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Reducer<?,?,?,?>>) 
+      conf.getClass(COMBINE_CLASS_ATTR, null);
+  }
+
+  /**
+   * Get the {@link Reducer} class for the job.
+   * 
+   * @return the {@link Reducer} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Reducer<?,?,?,?>> getReducerClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Reducer<?,?,?,?>>) 
+      conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
+  }
+
+  /**
+   * Get the {@link OutputFormat} class for the job.
+   * 
+   * @return the {@link OutputFormat} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends OutputFormat<?,?>>) 
+      conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
+  }
+
+  /**
+   * Get the {@link Partitioner} class for the job.
+   * 
+   * @return the {@link Partitioner} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Partitioner<?,?>> getPartitionerClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Partitioner<?,?>>) 
+      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
+  }
+
+  /**
+   * Get the {@link RawComparator} comparator used to compare keys.
+   * 
+   * @return the {@link RawComparator} comparator used to compare keys.
+   */
+  public RawComparator<?> getSortComparator() {
+    return conf.getOutputKeyComparator();
+  }
+
+  /**
+   * Get the pathname of the job's jar.
+   * @return the pathname
+   */
+  public String getJar() {
+    return conf.getJar();
+  }
+
+  /** 
+   * Get the user defined {@link RawComparator} comparator for 
+   * grouping keys of inputs to the reduce.
+   * 
+   * @return comparator set by the user for grouping values.
+   * @see Job#setGroupingComparatorClass(Class) for details.  
+   */
+  public RawComparator<?> getGroupingComparator() {
+    return conf.getOutputValueGroupingComparator();
+  }
+  
+  /**
+   * Get whether job-setup and job-cleanup is needed for the job 
+   * 
+   * @return boolean 
+   */
+  public boolean getJobSetupCleanupNeeded() {
+    return conf.getBoolean(MRJobConfig.SETUP_CLEANUP_NEEDED, true);
+  }
+  
+  /**
+   * Get whether task-cleanup is needed for the job 
+   * 
+   * @return boolean 
+   */
+  public boolean getTaskCleanupNeeded() {
+    return conf.getBoolean(MRJobConfig.TASK_CLEANUP_NEEDED, true);
+  }
+
+  /**
+   * This method checks to see if symlinks are to be create for the 
+   * localized cache files in the current working directory 
+   * @return true if symlinks are to be created- else return false
+   */
+  public boolean getSymlink() {
+    return DistributedCache.getSymlink(conf);
+  }
+  
+  /**
+   * Get the archive entries in classpath as an array of Path
+   */
+  public Path[] getArchiveClassPaths() {
+    return DistributedCache.getArchiveClassPaths(conf);
+  }
+
+  /**
+   * Get cache archives set in the Configuration
+   * @return A URI array of the caches set in the Configuration
+   * @throws IOException
+   */
+  public URI[] getCacheArchives() throws IOException {
+    return DistributedCache.getCacheArchives(conf);
+  }
+
+  /**
+   * Get cache files set in the Configuration
+   * @return A URI array of the files set in the Configuration
+   * @throws IOException
+   */
+
+  public URI[] getCacheFiles() throws IOException {
+    return DistributedCache.getCacheFiles(conf);
+  }
+
+  /**
+   * Return the path array of the localized caches
+   * @return A path array of localized caches
+   * @throws IOException
+   */
+  public Path[] getLocalCacheArchives()
+    throws IOException {
+    return DistributedCache.getLocalCacheArchives(conf);
+  }
+
+  /**
+   * Return the path array of the localized files
+   * @return A path array of localized files
+   * @throws IOException
+   */
+  public Path[] getLocalCacheFiles()
+    throws IOException {
+    return DistributedCache.getLocalCacheFiles(conf);
+  }
+
+  /**
+   * Get the file entries in classpath as an array of Path
+   */
+  public Path[] getFileClassPaths() {
+    return DistributedCache.getFileClassPaths(conf);
+  }
+  
+  /**
+   * Get the timestamps of the archives.  Used by internal
+   * DistributedCache and MapReduce code.
+   * @return a string array of timestamps 
+   * @throws IOException
+   */
+  public String[] getArchiveTimestamps() {
+    return DistributedCache.getArchiveTimestamps(conf);
+  }
+
+  /**
+   * Get the timestamps of the files.  Used by internal
+   * DistributedCache and MapReduce code.
+   * @return a string array of timestamps 
+   * @throws IOException
+   */
+  public String[] getFileTimestamps() {
+    return DistributedCache.getFileTimestamps(conf);
+  }
+
+  /** 
+   * Get the configured number of maximum attempts that will be made to run a
+   * map task, as specified by the <code>mapred.map.max.attempts</code>
+   * property. If this property is not already set, the default is 4 attempts.
+   *  
+   * @return the max number of attempts per map task.
+   */
+  public int getMaxMapAttempts() {
+    return conf.getMaxMapAttempts();
+  }
+
+  /** 
+   * Get the configured number of maximum attempts  that will be made to run a
+   * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+   * property. If this property is not already set, the default is 4 attempts.
+   * 
+   * @return the max number of attempts per reduce task.
+   */
+  public int getMaxReduceAttempts() {
+    return conf.getMaxReduceAttempts();
+  }
+
+  /**
+   * Get whether the task profiling is enabled.
+   * @return true if some tasks will be profiled
+   */
+  public boolean getProfileEnabled() {
+    return conf.getProfileEnabled();
+  }
+
+  /**
+   * Get the profiler configuration arguments.
+   *
+   * The default value for this property is
+   * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
+   * 
+   * @return the parameters to pass to the task child to configure profiling
+   */
+  public String getProfileParams() {
+    return conf.getProfileParams();
+  }
+
+  /**
+   * Get the range of maps or reduces to profile.
+   * @param isMap is the task a map?
+   * @return the task ranges
+   */
+  public IntegerRanges getProfileTaskRange(boolean isMap) {
+    return conf.getProfileTaskRange(isMap);
+  }
+
+  /**
+   * Get the reported username for this job.
+   * 
+   * @return the username
+   */
+  public String getUser() {
+    return conf.getUser();
+  }
+
+  public Credentials getCredentials() {
+    return credentials;
+  }
+
+  @Override
+  public JobConf getJobConf() {
+    return conf;
+  }
+
+  @Override
+  public Progressable getProgressible() {
+    return progress;
+  }
+  
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * The context that is given to the {@link Mapper}.
+ * @param <KEYIN> the key input type to the Mapper
+ * @param <VALUEIN> the value input type to the Mapper
+ * @param <KEYOUT> the key output type from the Mapper
+ * @param <VALUEOUT> the value output type from the Mapper
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+    extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+    implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+  private RecordReader<KEYIN,VALUEIN> reader;
+  private InputSplit split;
+
+  public MapContextImpl(Configuration conf, TaskAttemptID taskid,
+                        RecordReader<KEYIN,VALUEIN> reader,
+                        RecordWriter<KEYOUT,VALUEOUT> writer,
+                        OutputCommitter committer,
+                        MRTaskReporter reporter,
+                        InputSplit split) {
+    super(conf, taskid, writer, committer, reporter);
+    this.reader = reader;
+    this.split = split;
+  }
+
+  /**
+   * Get the input split for this map.
+   */
+  public InputSplit getInputSplit() {
+    return split;
+  }
+
+  @Override
+  public KEYIN getCurrentKey() throws IOException, InterruptedException {
+    return reader.getCurrentKey();
+  }
+
+  @Override
+  public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+    return reader.getCurrentValue();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return reader.nextKeyValue();
+  }
+
+}
+     

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.BackupStore;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * The context passed to the {@link Reducer}.
+ * @param <KEYIN> the class of the input keys
+ * @param <VALUEIN> the class of the input values
+ * @param <KEYOUT> the class of the output keys
+ * @param <VALUEOUT> the class of the output values
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+    extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+    implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+  private RawKeyValueIterator input;
+  private Counter inputValueCounter;
+  private Counter inputKeyCounter;
+  private RawComparator<KEYIN> comparator;
+  private KEYIN key;                                  // current key
+  private VALUEIN value;                              // current value
+  private boolean firstValue = false;                 // first value in key
+  private boolean nextKeyIsSame = false;              // more w/ this key
+  private boolean hasMore;                            // more in file
+  protected Progressable reporter;
+  private Deserializer<KEYIN> keyDeserializer;
+  private Deserializer<VALUEIN> valueDeserializer;
+  private DataInputBuffer buffer = new DataInputBuffer();
+  private BytesWritable currentRawKey = new BytesWritable();
+  private ValueIterable iterable = new ValueIterable();
+  private boolean isMarked = false;
+  private BackupStore<KEYIN,VALUEIN> backupStore;
+  private final SerializationFactory serializationFactory;
+  private final Class<KEYIN> keyClass;
+  private final Class<VALUEIN> valueClass;
+  private final Configuration conf;
+  private final TaskAttemptID taskid;
+  private int currentKeyLength = -1;
+  private int currentValueLength = -1;
+  
+  public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
+                           RawKeyValueIterator input, 
+                           Counter inputKeyCounter,
+                           Counter inputValueCounter,
+                           RecordWriter<KEYOUT,VALUEOUT> output,
+                           OutputCommitter committer,
+                           MRTaskReporter reporter,
+                           RawComparator<KEYIN> comparator,
+                           Class<KEYIN> keyClass,
+                           Class<VALUEIN> valueClass
+                          ) throws InterruptedException, IOException{
+    super(conf, taskid, output, committer, reporter);
+    this.input = input;
+    this.inputKeyCounter = inputKeyCounter;
+    this.inputValueCounter = inputValueCounter;
+    this.comparator = comparator;
+    this.serializationFactory = new SerializationFactory(conf);
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyDeserializer.open(buffer);
+    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
+    this.valueDeserializer.open(buffer);
+    hasMore = input.next();
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
+    this.conf = conf;
+    this.taskid = taskid;
+  }
+
+  /** Start processing next unique key. */
+  public boolean nextKey() throws IOException,InterruptedException {
+    while (hasMore && nextKeyIsSame) {
+      nextKeyValue();
+    }
+    if (hasMore) {
+      if (inputKeyCounter != null) {
+        inputKeyCounter.increment(1);
+      }
+      return nextKeyValue();
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Advance to the next key/value pair.
+   */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!hasMore) {
+      key = null;
+      value = null;
+      return false;
+    }
+    firstValue = !nextKeyIsSame;
+    DataInputBuffer nextKey = input.getKey();
+    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
+                      nextKey.getLength() - nextKey.getPosition());
+    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
+    key = keyDeserializer.deserialize(key);
+    DataInputBuffer nextVal = input.getValue();
+    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
+    value = valueDeserializer.deserialize(value);
+
+    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
+    currentValueLength = nextVal.getLength() - nextVal.getPosition();
+
+    if (isMarked) {
+      backupStore.write(nextKey, nextVal);
+    }
+
+    hasMore = input.next();
+    if (hasMore) {
+      nextKey = input.getKey();
+      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
+                                     currentRawKey.getLength(),
+                                     nextKey.getData(),
+                                     nextKey.getPosition(),
+                                     nextKey.getLength() - nextKey.getPosition()
+                                         ) == 0;
+    } else {
+      nextKeyIsSame = false;
+    }
+    inputValueCounter.increment(1);
+    return true;
+  }
+
+  public KEYIN getCurrentKey() {
+    return key;
+  }
+
+  @Override
+  public VALUEIN getCurrentValue() {
+    return value;
+  }
+  
+  BackupStore<KEYIN,VALUEIN> getBackupStore() {
+    return backupStore;
+  }
+  
+  protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
+
+    private boolean inReset = false;
+    private boolean clearMarkFlag = false;
+
+    public boolean hasNext() {
+      try {
+        if (inReset && backupStore.hasNext()) {
+          return true;
+        } 
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException("hasNext failed", e);
+      }
+      return firstValue || nextKeyIsSame;
+    }
+
+    public VALUEIN next() {
+      if (inReset) {
+        try {
+          if (backupStore.hasNext()) {
+            backupStore.next();
+            DataInputBuffer next = backupStore.nextValue();
+            buffer.reset(next.getData(), next.getPosition(), next.getLength());
+            value = valueDeserializer.deserialize(value);
+            return value;
+          } else {
+            inReset = false;
+            backupStore.exitResetMode();
+            if (clearMarkFlag) {
+              clearMarkFlag = false;
+              isMarked = false;
+            }
+          }
+        } catch (IOException e) {
+          e.printStackTrace();
+          throw new RuntimeException("next value iterator failed", e);
+        }
+      } 
+
+      // if this is the first record, we don't need to advance
+      if (firstValue) {
+        firstValue = false;
+        return value;
+      }
+      // if this isn't the first record and the next key is different, they
+      // can't advance it here.
+      if (!nextKeyIsSame) {
+        throw new NoSuchElementException("iterate past last value");
+      }
+      // otherwise, go to the next key/value pair
+      try {
+        nextKeyValue();
+        return value;
+      } catch (IOException ie) {
+        throw new RuntimeException("next value iterator failed", ie);
+      } catch (InterruptedException ie) {
+        // this is bad, but we can't modify the exception list of java.util
+        throw new RuntimeException("next value iterator interrupted", ie);        
+      }
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException("remove not implemented");
+    }
+
+    public void mark() throws IOException {
+      if (getBackupStore() == null) {
+        backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
+      }
+      isMarked = true;
+      if (!inReset) {
+        backupStore.reinitialize();
+        if (currentKeyLength == -1) {
+          // The user has not called next() for this iterator yet, so
+          // there is no current record to mark and copy to backup store.
+          return;
+        }
+        assert (currentValueLength != -1);
+        int requestedSize = currentKeyLength + currentValueLength + 
+          WritableUtils.getVIntSize(currentKeyLength) +
+          WritableUtils.getVIntSize(currentValueLength);
+        DataOutputStream out = backupStore.getOutputStream(requestedSize);
+        writeFirstKeyValueBytes(out);
+        backupStore.updateCounters(requestedSize);
+      } else {
+        backupStore.mark();
+      }
+    }
+
+    public void reset() throws IOException {
+      // We reached the end of an iteration and user calls a 
+      // reset, but a clearMark was called before, just throw
+      // an exception
+      if (clearMarkFlag) {
+        clearMarkFlag = false;
+        backupStore.clearMark();
+        throw new IOException("Reset called without a previous mark");
+      }
+      
+      if (!isMarked) {
+        throw new IOException("Reset called without a previous mark");
+      }
+      inReset = true;
+      backupStore.reset();
+    }
+
+    public void clearMark() throws IOException {
+      if (getBackupStore() == null) {
+        return;
+      }
+      if (inReset) {
+        clearMarkFlag = true;
+        backupStore.clearMark();
+      } else {
+        inReset = isMarked = false;
+        backupStore.reinitialize();
+      }
+    }
+    
+    /**
+     * This method is called when the reducer moves from one key to 
+     * another.
+     * @throws IOException
+     */
+    public void resetBackupStore() throws IOException {
+      if (getBackupStore() == null) {
+        return;
+      }
+      inReset = isMarked = false;
+      backupStore.reinitialize();
+      currentKeyLength = -1;
+    }
+
+    /**
+     * This method is called to write the record that was most recently
+     * served (before a call to the mark). Since the framework reads one
+     * record in advance, to get this record, we serialize the current key
+     * and value
+     * @param out
+     * @throws IOException
+     */
+    private void writeFirstKeyValueBytes(DataOutputStream out) 
+    throws IOException {
+      assert (getCurrentKey() != null && getCurrentValue() != null);
+      WritableUtils.writeVInt(out, currentKeyLength);
+      WritableUtils.writeVInt(out, currentValueLength);
+      Serializer<KEYIN> keySerializer = 
+        serializationFactory.getSerializer(keyClass);
+      keySerializer.open(out);
+      keySerializer.serialize(getCurrentKey());
+
+      Serializer<VALUEIN> valueSerializer = 
+        serializationFactory.getSerializer(valueClass);
+      valueSerializer.open(out);
+      valueSerializer.serialize(getCurrentValue());
+    }
+  }
+
+  protected class ValueIterable implements Iterable<VALUEIN> {
+    private ValueIterator iterator = new ValueIterator();
+    public Iterator<VALUEIN> iterator() {
+      return iterator;
+    } 
+  }
+  
+  /**
+   * Iterate through the values for the current key, reusing the same value 
+   * object, which is stored in the context.
+   * @return the series of values associated with the current key. All of the 
+   * objects returned directly and indirectly from this method are reused.
+   */
+  public 
+  Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
+    return iterable;
+  }
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * The context for task attempts.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TaskAttemptContextImpl extends JobContextImpl 
+    implements TaskAttemptContext {
+  private final TaskAttemptID taskId;
+  private String status = "";
+  private MRTaskReporter reporter;
+
+  public TaskAttemptContextImpl(Configuration conf, 
+                                TaskAttemptID taskId) {
+    this(conf, taskId, null);
+  }
+
+  public TaskAttemptContextImpl(Configuration conf, 
+      TaskAttemptID taskId, MRTaskReporter reporter) {
+    super(conf, IDConverter.fromMRJobId(taskId.getJobID()));
+    this.taskId = taskId;
+    this.reporter = reporter;
+  }
+
+  /**
+   * Get the unique name for this task attempt.
+   */
+  public TaskAttemptID getTaskAttemptID() {
+    return taskId;
+  }
+
+  /**
+   * Get the last set status message.
+   * @return the current status message
+   */
+  public String getStatus() {
+    return status;
+  }
+
+  public Counter getCounter(Enum<?> counterName) {
+    return (Counter) reporter.getCounter(counterName);
+  }
+
+  public Counter getCounter(String groupName, String counterName) {
+    return (Counter) reporter.getCounter(groupName, counterName);
+  }
+
+  /**
+   * Report progress.
+   */
+  public void progress() {
+    reporter.progress();
+  }
+
+  protected void setStatusString(String status) {
+    this.status = status;
+  }
+
+  /**
+   * Set the current status of the task to the given string.
+   */
+  public void setStatus(String status) {
+    String normalizedStatus = Task.normalizeStatus(status, conf);
+    setStatusString(normalizedStatus);
+    reporter.setStatus(normalizedStatus);
+  }
+
+  public static class DummyReporter extends StatusReporter {
+    public void setStatus(String s) {
+    }
+    public void progress() {
+    }
+    public Counter getCounter(Enum<?> name) {
+      return new Counters().findCounter(name);
+    }
+    public Counter getCounter(String group, String name) {
+      return new Counters().findCounter(group, name);
+    }
+    public float getProgress() {
+      return 0f;
+    }
+  }
+  
+  public float getProgress() {
+    return reporter.getProgress();
+  }
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * A context object that allows input and output from the task. It is only
+ * supplied to the {@link Mapper} or {@link Reducer}.
+ * @param <KEYIN> the input key type for the task
+ * @param <VALUEIN> the input value type for the task
+ * @param <KEYOUT> the output key type for the task
+ * @param <VALUEOUT> the output value type for the task
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+       extends TaskAttemptContextImpl 
+       implements TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+  private RecordWriter<KEYOUT,VALUEOUT> output;
+  private OutputCommitter committer;
+
+  public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
+                                    RecordWriter<KEYOUT,VALUEOUT> output,
+                                    OutputCommitter committer,
+                                    MRTaskReporter reporter) {
+    super(conf, taskid, reporter);
+    this.output = output;
+    this.committer = committer;
+  }
+
+  /**
+   * Advance to the next key, value pair, returning null if at end.
+   * @return the key object that was read into, or null if no more
+   */
+  public abstract 
+  boolean nextKeyValue() throws IOException, InterruptedException;
+ 
+  /**
+   * Get the current key.
+   * @return the current key object or null if there isn't one
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract 
+  KEYIN getCurrentKey() throws IOException, InterruptedException;
+
+  /**
+   * Get the current value.
+   * @return the value object that was read into
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract VALUEIN getCurrentValue() throws IOException, 
+                                                   InterruptedException;
+
+  /**
+   * Generate an output key/value pair.
+   */
+  public void write(KEYOUT key, VALUEOUT value
+                    ) throws IOException, InterruptedException {
+    output.write(key, value);
+  }
+
+  public OutputCommitter getOutputCommitter() {
+    return committer;
+  }
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+
+public class ProceedToCompletionResponse implements Writable{
+
+  private boolean shouldDie;
+  private boolean readyToProceed;
+
+  public ProceedToCompletionResponse() {
+  }
+  
+  public ProceedToCompletionResponse(boolean shouldDie, boolean readyToProceed) {
+    this.shouldDie = shouldDie;
+    this.readyToProceed = readyToProceed;
+  }
+
+  /**
+   * Indicates whether the task is required to proceed to completion, or should
+   * terminate.
+   * 
+   * @return
+   */
+  public boolean shouldDie() {
+    return this.shouldDie;
+  }
+  
+  /**
+   * Indicates whether the task is ready to proceed. Valid only if shouldDie is
+   * false.
+   * 
+   * @return
+   */
+  public boolean readyToProceed() {
+    return this.readyToProceed;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(shouldDie);
+    out.writeBoolean(readyToProceed);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    shouldDie = in.readBoolean();
+    readyToProceed = in.readBoolean();
+  }
+
+  @Override
+  public String toString() {
+    return "shouldDie: " + shouldDie + ", readyToProceed: " + readyToProceed;
+  }
+}