You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC

svn commit: r1469642 [33/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * The context that is given to the {@link Mapper}.
+ * @param <KEYIN> the key input type to the Mapper
+ * @param <VALUEIN> the value input type to the Mapper
+ * @param <KEYOUT> the key output type from the Mapper
+ * @param <VALUEOUT> the value output type from the Mapper
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+    extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+    implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+  private RecordReader<KEYIN,VALUEIN> reader;
+  private InputSplit split;
+
+  public MapContextImpl(Configuration conf, TaskAttemptID taskid,
+                        RecordReader<KEYIN,VALUEIN> reader,
+                        RecordWriter<KEYOUT,VALUEOUT> writer,
+                        OutputCommitter committer,
+                        MRTaskReporter reporter,
+                        InputSplit split) {
+    super(conf, taskid, writer, committer, reporter);
+    this.reader = reader;
+    this.split = split;
+  }
+
+  /**
+   * Get the input split for this map.
+   */
+  public InputSplit getInputSplit() {
+    return split;
+  }
+
+  @Override
+  public KEYIN getCurrentKey() throws IOException, InterruptedException {
+    return reader.getCurrentKey();
+  }
+
+  @Override
+  public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+    return reader.getCurrentValue();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return reader.nextKeyValue();
+  }
+
+}
+     

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.BackupStore;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * The context passed to the {@link Reducer}.
+ * @param <KEYIN> the class of the input keys
+ * @param <VALUEIN> the class of the input values
+ * @param <KEYOUT> the class of the output keys
+ * @param <VALUEOUT> the class of the output values
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+    extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+    implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+  private RawKeyValueIterator input;
+  private Counter inputValueCounter;
+  private Counter inputKeyCounter;
+  private RawComparator<KEYIN> comparator;
+  private KEYIN key;                                  // current key
+  private VALUEIN value;                              // current value
+  private boolean firstValue = false;                 // first value in key
+  private boolean nextKeyIsSame = false;              // more w/ this key
+  private boolean hasMore;                            // more in file
+  protected Progressable reporter;
+  private Deserializer<KEYIN> keyDeserializer;
+  private Deserializer<VALUEIN> valueDeserializer;
+  private DataInputBuffer buffer = new DataInputBuffer();
+  private BytesWritable currentRawKey = new BytesWritable();
+  private ValueIterable iterable = new ValueIterable();
+  private boolean isMarked = false;
+  private BackupStore<KEYIN,VALUEIN> backupStore;
+  private final SerializationFactory serializationFactory;
+  private final Class<KEYIN> keyClass;
+  private final Class<VALUEIN> valueClass;
+  private final Configuration conf;
+  private final TaskAttemptID taskid;
+  private int currentKeyLength = -1;
+  private int currentValueLength = -1;
+  
+  public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
+                           RawKeyValueIterator input, 
+                           Counter inputKeyCounter,
+                           Counter inputValueCounter,
+                           RecordWriter<KEYOUT,VALUEOUT> output,
+                           OutputCommitter committer,
+                           MRTaskReporter reporter,
+                           RawComparator<KEYIN> comparator,
+                           Class<KEYIN> keyClass,
+                           Class<VALUEIN> valueClass
+                          ) throws InterruptedException, IOException{
+    super(conf, taskid, output, committer, reporter);
+    this.input = input;
+    this.inputKeyCounter = inputKeyCounter;
+    this.inputValueCounter = inputValueCounter;
+    this.comparator = comparator;
+    this.serializationFactory = new SerializationFactory(conf);
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyDeserializer.open(buffer);
+    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
+    this.valueDeserializer.open(buffer);
+    hasMore = input.next();
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
+    this.conf = conf;
+    this.taskid = taskid;
+  }
+
+  /** Start processing next unique key. */
+  public boolean nextKey() throws IOException,InterruptedException {
+    while (hasMore && nextKeyIsSame) {
+      nextKeyValue();
+    }
+    if (hasMore) {
+      if (inputKeyCounter != null) {
+        inputKeyCounter.increment(1);
+      }
+      return nextKeyValue();
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Advance to the next key/value pair.
+   */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!hasMore) {
+      key = null;
+      value = null;
+      return false;
+    }
+    firstValue = !nextKeyIsSame;
+    DataInputBuffer nextKey = input.getKey();
+    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
+                      nextKey.getLength() - nextKey.getPosition());
+    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
+    key = keyDeserializer.deserialize(key);
+    DataInputBuffer nextVal = input.getValue();
+    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
+    value = valueDeserializer.deserialize(value);
+
+    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
+    currentValueLength = nextVal.getLength() - nextVal.getPosition();
+
+    if (isMarked) {
+      backupStore.write(nextKey, nextVal);
+    }
+
+    hasMore = input.next();
+    if (hasMore) {
+      nextKey = input.getKey();
+      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
+                                     currentRawKey.getLength(),
+                                     nextKey.getData(),
+                                     nextKey.getPosition(),
+                                     nextKey.getLength() - nextKey.getPosition()
+                                         ) == 0;
+    } else {
+      nextKeyIsSame = false;
+    }
+    inputValueCounter.increment(1);
+    return true;
+  }
+
+  public KEYIN getCurrentKey() {
+    return key;
+  }
+
+  @Override
+  public VALUEIN getCurrentValue() {
+    return value;
+  }
+  
+  BackupStore<KEYIN,VALUEIN> getBackupStore() {
+    return backupStore;
+  }
+  
+  protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
+
+    private boolean inReset = false;
+    private boolean clearMarkFlag = false;
+
+    public boolean hasNext() {
+      try {
+        if (inReset && backupStore.hasNext()) {
+          return true;
+        } 
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException("hasNext failed", e);
+      }
+      return firstValue || nextKeyIsSame;
+    }
+
+    public VALUEIN next() {
+      if (inReset) {
+        try {
+          if (backupStore.hasNext()) {
+            backupStore.next();
+            DataInputBuffer next = backupStore.nextValue();
+            buffer.reset(next.getData(), next.getPosition(), next.getLength());
+            value = valueDeserializer.deserialize(value);
+            return value;
+          } else {
+            inReset = false;
+            backupStore.exitResetMode();
+            if (clearMarkFlag) {
+              clearMarkFlag = false;
+              isMarked = false;
+            }
+          }
+        } catch (IOException e) {
+          e.printStackTrace();
+          throw new RuntimeException("next value iterator failed", e);
+        }
+      } 
+
+      // if this is the first record, we don't need to advance
+      if (firstValue) {
+        firstValue = false;
+        return value;
+      }
+      // if this isn't the first record and the next key is different, they
+      // can't advance it here.
+      if (!nextKeyIsSame) {
+        throw new NoSuchElementException("iterate past last value");
+      }
+      // otherwise, go to the next key/value pair
+      try {
+        nextKeyValue();
+        return value;
+      } catch (IOException ie) {
+        throw new RuntimeException("next value iterator failed", ie);
+      } catch (InterruptedException ie) {
+        // this is bad, but we can't modify the exception list of java.util
+        throw new RuntimeException("next value iterator interrupted", ie);        
+      }
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException("remove not implemented");
+    }
+
+    public void mark() throws IOException {
+      if (getBackupStore() == null) {
+        backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
+      }
+      isMarked = true;
+      if (!inReset) {
+        backupStore.reinitialize();
+        if (currentKeyLength == -1) {
+          // The user has not called next() for this iterator yet, so
+          // there is no current record to mark and copy to backup store.
+          return;
+        }
+        assert (currentValueLength != -1);
+        int requestedSize = currentKeyLength + currentValueLength + 
+          WritableUtils.getVIntSize(currentKeyLength) +
+          WritableUtils.getVIntSize(currentValueLength);
+        DataOutputStream out = backupStore.getOutputStream(requestedSize);
+        writeFirstKeyValueBytes(out);
+        backupStore.updateCounters(requestedSize);
+      } else {
+        backupStore.mark();
+      }
+    }
+
+    public void reset() throws IOException {
+      // We reached the end of an iteration and user calls a 
+      // reset, but a clearMark was called before, just throw
+      // an exception
+      if (clearMarkFlag) {
+        clearMarkFlag = false;
+        backupStore.clearMark();
+        throw new IOException("Reset called without a previous mark");
+      }
+      
+      if (!isMarked) {
+        throw new IOException("Reset called without a previous mark");
+      }
+      inReset = true;
+      backupStore.reset();
+    }
+
+    public void clearMark() throws IOException {
+      if (getBackupStore() == null) {
+        return;
+      }
+      if (inReset) {
+        clearMarkFlag = true;
+        backupStore.clearMark();
+      } else {
+        inReset = isMarked = false;
+        backupStore.reinitialize();
+      }
+    }
+    
+    /**
+     * This method is called when the reducer moves from one key to 
+     * another.
+     * @throws IOException
+     */
+    public void resetBackupStore() throws IOException {
+      if (getBackupStore() == null) {
+        return;
+      }
+      inReset = isMarked = false;
+      backupStore.reinitialize();
+      currentKeyLength = -1;
+    }
+
+    /**
+     * This method is called to write the record that was most recently
+     * served (before a call to the mark). Since the framework reads one
+     * record in advance, to get this record, we serialize the current key
+     * and value
+     * @param out
+     * @throws IOException
+     */
+    private void writeFirstKeyValueBytes(DataOutputStream out) 
+    throws IOException {
+      assert (getCurrentKey() != null && getCurrentValue() != null);
+      WritableUtils.writeVInt(out, currentKeyLength);
+      WritableUtils.writeVInt(out, currentValueLength);
+      Serializer<KEYIN> keySerializer = 
+        serializationFactory.getSerializer(keyClass);
+      keySerializer.open(out);
+      keySerializer.serialize(getCurrentKey());
+
+      Serializer<VALUEIN> valueSerializer = 
+        serializationFactory.getSerializer(valueClass);
+      valueSerializer.open(out);
+      valueSerializer.serialize(getCurrentValue());
+    }
+  }
+
+  protected class ValueIterable implements Iterable<VALUEIN> {
+    private ValueIterator iterator = new ValueIterator();
+    public Iterator<VALUEIN> iterator() {
+      return iterator;
+    } 
+  }
+  
+  /**
+   * Iterate through the values for the current key, reusing the same value 
+   * object, which is stored in the context.
+   * @return the series of values associated with the current key. All of the 
+   * objects returned directly and indirectly from this method are reused.
+   */
+  public 
+  Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
+    return iterable;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/ReduceContextImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * The context for task attempts.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TaskAttemptContextImpl extends JobContextImpl 
+    implements TaskAttemptContext {
+  private final TaskAttemptID taskId;
+  private String status = "";
+  private MRTaskReporter reporter;
+
+  public TaskAttemptContextImpl(Configuration conf, 
+                                TaskAttemptID taskId) {
+    this(conf, taskId, null);
+  }
+
+  public TaskAttemptContextImpl(Configuration conf,
+      TaskAttemptID taskId, MRTaskReporter reporter) {
+    super(conf, IDConverter.fromMRJobId(taskId.getJobID()));
+    this.taskId = taskId;
+    this.reporter = reporter;
+  }
+
+  /**
+   * Get the unique name for this task attempt.
+   */
+  public TaskAttemptID getTaskAttemptID() {
+    return taskId;
+  }
+
+  /**
+   * Get the last set status message.
+   * @return the current status message
+   */
+  public String getStatus() {
+    return status;
+  }
+
+  public Counter getCounter(Enum<?> counterName) {
+    return (Counter) reporter.getCounter(counterName);
+  }
+
+  public Counter getCounter(String groupName, String counterName) {
+    return (Counter) reporter.getCounter(groupName, counterName);
+  }
+
+  /**
+   * Report progress.
+   */
+  public void progress() {
+    reporter.progress();
+  }
+
+  protected void setStatusString(String status) {
+    this.status = status;
+  }
+
+  /**
+   * Set the current status of the task to the given string.
+   */
+  public void setStatus(String status) {
+    String normalizedStatus = Task.normalizeStatus(status, conf);
+    setStatusString(normalizedStatus);
+    reporter.setStatus(normalizedStatus);
+  }
+
+  public static class DummyReporter extends StatusReporter {
+    public void setStatus(String s) {
+    }
+    public void progress() {
+    }
+    public Counter getCounter(Enum<?> name) {
+      return new Counters().findCounter(name);
+    }
+    public Counter getCounter(String group, String name) {
+      return new Counters().findCounter(group, name);
+    }
+    public float getProgress() {
+      return 0f;
+    }
+  }
+  
+  public float getProgress() {
+    return reporter.getProgress();
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+/**
+ * A context object that allows input and output from the task. It is only
+ * supplied to the {@link Mapper} or {@link Reducer}.
+ * @param <KEYIN> the input key type for the task
+ * @param <VALUEIN> the input value type for the task
+ * @param <KEYOUT> the output key type for the task
+ * @param <VALUEOUT> the output value type for the task
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+       extends TaskAttemptContextImpl 
+       implements TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+  private RecordWriter<KEYOUT,VALUEOUT> output;
+  private OutputCommitter committer;
+
+  public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
+                                    RecordWriter<KEYOUT,VALUEOUT> output,
+                                    OutputCommitter committer,
+                                    MRTaskReporter reporter) {
+    super(conf, taskid, reporter);
+    this.output = output;
+    this.committer = committer;
+  }
+
+  /**
+   * Advance to the next key, value pair, returning null if at end.
+   * @return the key object that was read into, or null if no more
+   */
+  public abstract 
+  boolean nextKeyValue() throws IOException, InterruptedException;
+ 
+  /**
+   * Get the current key.
+   * @return the current key object or null if there isn't one
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract 
+  KEYIN getCurrentKey() throws IOException, InterruptedException;
+
+  /**
+   * Get the current value.
+   * @return the value object that was read into
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract VALUEIN getCurrentValue() throws IOException, 
+                                                   InterruptedException;
+
+  /**
+   * Generate an output key/value pair.
+   */
+  public void write(KEYOUT key, VALUEOUT value
+                    ) throws IOException, InterruptedException {
+    output.write(key, value);
+  }
+
+  public OutputCommitter getOutputCommitter() {
+    return committer;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+
+public class ProceedToCompletionResponse implements Writable{
+
+  private boolean shouldDie;
+  private boolean readyToProceed;
+
+  public ProceedToCompletionResponse() {
+  }
+  
+  public ProceedToCompletionResponse(boolean shouldDie, boolean readyToProceed) {
+    this.shouldDie = shouldDie;
+    this.readyToProceed = readyToProceed;
+  }
+
+  /**
+   * Indicates whether the task is required to proceed to completion, or should
+   * terminate.
+   * 
+   * @return
+   */
+  public boolean shouldDie() {
+    return this.shouldDie;
+  }
+  
+  /**
+   * Indicates whether the task is ready to proceed. Valid only if shouldDie is
+   * false.
+   * 
+   * @return
+   */
+  public boolean readyToProceed() {
+    return this.readyToProceed;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(shouldDie);
+    out.writeBoolean(readyToProceed);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    shouldDie = in.readBoolean();
+    readyToProceed = in.readBoolean();
+  }
+
+  @Override
+  public String toString() {
+    return "shouldDie: " + shouldDie + ", readyToProceed: " + readyToProceed;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/records/ProceedToCompletionResponse.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.input;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * {@link SimpleInput} is an {@link Input} which provides key/values pairs
+ * for the consumer.
+ *
+ * It is compatible with all standard Apache Hadoop MapReduce 
+ * {@link InputFormat} implementations.
+ */
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class SimpleInput implements Input {
+
+  private static final Log LOG = LogFactory.getLog(SimpleInput.class);
+  
+  MRTask task;
+  
+  boolean useNewApi;
+  
+  JobConf jobConf;
+  
+  org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
+
+  org.apache.hadoop.mapreduce.InputFormat newInputFormat;
+  org.apache.hadoop.mapreduce.RecordReader newRecordReader;
+  
+  org.apache.hadoop.mapred.InputFormat oldInputFormat;
+  org.apache.hadoop.mapred.RecordReader oldRecordReader;
+
+  Object key;
+  Object value;
+  
+  private TezCounter inputRecordCounter;
+  private TezCounter fileInputByteCounter; 
+  private List<Statistics> fsStats;
+  private MRTaskReporter reporter;
+
+  @Inject
+  public SimpleInput(
+      @Assisted TezTask task
+      ) 
+  {}
+  
+  public void setTask(MRTask task) {
+    this.task = task;
+  }
+
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    if (task == null) {
+      return;
+    }
+    
+    if (conf instanceof JobConf) {
+      jobConf = (JobConf)conf;
+    } else {
+      jobConf = new JobConf(conf);
+    }
+    
+    useNewApi = jobConf.getUseNewMapper();
+    taskAttemptContext = task.getTaskAttemptContext();
+    
+    inputRecordCounter = task.getInputRecordsCounter();
+    fileInputByteCounter = task.getFileInputBytesCounter();
+
+    reporter = task.getMRReporter();
+
+    if (useNewApi) {
+      try {
+        newInputFormat = 
+            ReflectionUtils.newInstance(
+                taskAttemptContext.getInputFormatClass(), jobConf);
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException(cnfe);
+      }
+      
+      newInputSplit = getNewSplitDetails(task.getSplitIndex());
+      List<Statistics> matchedStats = null;
+      if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+        matchedStats = MRTask.getFsStatistics(
+            ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
+                newInputSplit).getPath(), jobConf);
+      }
+      fsStats = matchedStats;
+      newRecordReader = 
+          newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
+    } else {
+      oldInputFormat = jobConf.getInputFormat();
+      org.apache.hadoop.mapred.InputSplit oldInputSplit =
+          getOldSplitDetails(task.getSplitIndex());
+      
+      List<Statistics> matchedStats = null;
+      if (oldInputSplit instanceof FileSplit) {
+        matchedStats = 
+            MRTask.getFsStatistics(
+                ((FileSplit)oldInputSplit).getPath(), jobConf);
+      }
+      fsStats = matchedStats;
+
+      long bytesInPrev = getInputBytes();
+      oldRecordReader = 
+          jobConf.getInputFormat().getRecordReader(
+              oldInputSplit, jobConf, reporter);
+      long bytesInCurr = getInputBytes();
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+
+      updateJobWithSplit(jobConf, oldInputSplit);
+    }
+  }
+
+  public boolean hasNext() throws IOException, InterruptedException {
+    boolean hasNext = false;
+    long bytesInPrev = getInputBytes();
+
+    if (useNewApi) { 
+        hasNext = newRecordReader.nextKeyValue();
+    } else {
+      hasNext = oldRecordReader.next(key, value);
+    }
+    
+    long bytesInCurr = getInputBytes();
+    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+    reporter.setProgress(getProgress());
+
+    if (hasNext) {
+      inputRecordCounter.increment(1);
+    }
+    
+    return hasNext;
+  }
+
+  private SimpleValueIterator vIter = new SimpleValueIterator();
+  private SimpleIterable valuesIterable =
+      new SimpleIterable(vIter);
+
+  private org.apache.hadoop.mapreduce.InputSplit newInputSplit;
+
+  public void setKey(Object key) {
+    this.key = key;
+  }
+  
+  public void setValue(Object value) {
+    this.value = value;
+  }
+
+  public Object getNextKey() throws IOException, InterruptedException {
+    if (useNewApi) {
+      return newRecordReader.getCurrentKey();
+    } else {
+      return key;
+    }
+  }
+
+  public Iterable getNextValues() throws IOException,
+      InterruptedException {
+    value = newRecordReader.getCurrentValue();
+    vIter.setValue(value);
+    return valuesIterable;
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    if (useNewApi) {
+      return newRecordReader.getProgress();
+    } else {
+      return oldRecordReader.getProgress();
+    }
+  }
+
+  public void close() throws IOException {
+    long bytesInPrev = getInputBytes();
+    if (useNewApi) {
+      newRecordReader.close();
+    } else {
+      oldRecordReader.close();
+    }
+    long bytesInCurr = getInputBytes();
+    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+  }
+
+  static class SimpleValueIterator implements Iterator {
+
+    private Object value;
+    
+    public void setValue(Object value) {
+      this.value = value;
+    }
+    
+    public boolean hasNext() {
+      return false;
+    }
+
+    public Object next() {
+      Object value = this.value;
+      this.value = null;
+      return value;
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  static class SimpleIterable implements Iterable {
+    private final Iterator iterator;
+    public SimpleIterable(Iterator iterator) {
+      this.iterator = iterator;
+    }
+    
+    public Iterator iterator() {
+      return iterator;
+    }
+  }
+  
+
+  public RecordReader getOldRecordReader() {
+    return oldRecordReader;
+  }
+  
+  public org.apache.hadoop.mapreduce.RecordReader getNewRecordReader() {
+    return newRecordReader;
+  }
+  
+  public org.apache.hadoop.mapred.InputSplit 
+  getOldSplitDetails(TaskSplitIndex splitMetaInfo) 
+      throws IOException {
+    Path file = new Path(splitMetaInfo.getSplitLocation());
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    file = fs.makeQualified(file);
+    LOG.info("Reading input split file from : " + file);
+    long offset = splitMetaInfo.getStartOffset();
+    
+    FSDataInputStream inFile = fs.open(file);
+    inFile.seek(offset);
+    String className = Text.readString(inFile);
+    Class<org.apache.hadoop.mapred.InputSplit> cls;
+    try {
+      cls = 
+          (Class<org.apache.hadoop.mapred.InputSplit>) 
+          jobConf.getClassByName(className);
+    } catch (ClassNotFoundException ce) {
+      IOException wrap = new IOException("Split class " + className + 
+          " not found");
+      wrap.initCause(ce);
+      throw wrap;
+    }
+    SerializationFactory factory = new SerializationFactory(jobConf);
+    Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = 
+        (Deserializer<org.apache.hadoop.mapred.InputSplit>) 
+        factory.getDeserializer(cls);
+    deserializer.open(inFile);
+    org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
+    long pos = inFile.getPos();
+    reporter.getCounter(TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
+    inFile.close();
+    return split;
+  }
+
+  public org.apache.hadoop.mapreduce.InputSplit 
+  getNewSplitDetails(TaskSplitIndex splitMetaInfo) 
+      throws IOException {
+    Path file = new Path(splitMetaInfo.getSplitLocation());
+    long offset = splitMetaInfo.getStartOffset();
+    
+    // Split information read from local filesystem.
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    file = fs.makeQualified(file);
+    LOG.info("Reading input split file from : " + file);
+    FSDataInputStream inFile = fs.open(file);
+    inFile.seek(offset);
+    String className = Text.readString(inFile);
+    Class<org.apache.hadoop.mapreduce.InputSplit> cls;
+    try {
+      cls = 
+          (Class<org.apache.hadoop.mapreduce.InputSplit>) 
+          jobConf.getClassByName(className);
+    } catch (ClassNotFoundException ce) {
+      IOException wrap = new IOException("Split class " + className + 
+          " not found");
+      wrap.initCause(ce);
+      throw wrap;
+    }
+    SerializationFactory factory = new SerializationFactory(jobConf);
+    Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = 
+        (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) 
+        factory.getDeserializer(cls);
+    deserializer.open(inFile);
+    org.apache.hadoop.mapreduce.InputSplit split = 
+        deserializer.deserialize(null);
+    long pos = inFile.getPos();
+    reporter.getCounter(TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
+    inFile.close();
+    return split;
+  }
+
+  private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
+    if (inputSplit instanceof FileSplit) {
+      FileSplit fileSplit = (FileSplit) inputSplit;
+      job.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
+      job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
+      job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
+    }
+    LOG.info("Processing split: " + inputSplit);
+  }
+
+  private long getInputBytes() {
+    if (fsStats == null) return 0;
+    long bytesRead = 0;
+    for (Statistics stat: fsStats) {
+      bytesRead = bytesRead + stat.getBytesRead();
+    }
+    return bytesRead;
+  }
+
+  public void initializeNewRecordReader(
+      org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context) 
+  throws IOException, InterruptedException {
+    newRecordReader.initialize(split, context);
+  }
+  
+  public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
+    return newInputSplit;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.output;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * {@link SimpleOutput} is an {@link Output} which persists key/value pairs
+ * written to it. 
+ * 
+ * It is compatible with all standard Apache Hadoop MapReduce 
+ * {@link OutputFormat} implementations. 
+ */
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class SimpleOutput implements Output {
+
+  private MRTask task;
+  
+  boolean useNewApi;
+  JobConf jobConf;
+  
+  org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
+  org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
+  org.apache.hadoop.mapreduce.RecordWriter newRecordWriter;
+  
+  org.apache.hadoop.mapred.OutputFormat oldOutputFormat;
+  org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
+  
+  private TezCounter outputRecordCounter;
+  private TezCounter fileOutputByteCounter; 
+  private List<Statistics> fsStats;
+  private MRTaskReporter reporter;
+  
+  @Inject
+  public SimpleOutput(
+      @Assisted TezTask task
+      ) {
+  }
+  
+  public void setTask(MRTask task) {
+    this.task = task;
+  }
+  
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+
+    if (task == null) {
+      return;
+    }
+    
+    if (conf instanceof JobConf) {
+      jobConf = (JobConf)conf;
+    } else {
+      jobConf = new JobConf(conf);
+    }
+    
+    useNewApi = jobConf.getUseNewMapper();
+    taskAttemptContext = task.getTaskAttemptContext();
+    
+    outputRecordCounter = task.getOutputRecordsCounter();
+    fileOutputByteCounter = task.getFileOutputBytesCounter();
+
+    reporter = task.getMRReporter();
+    
+    if (useNewApi) {
+      try {
+        newOutputFormat =
+            ReflectionUtils.newInstance(
+                taskAttemptContext.getOutputFormatClass(), jobConf);
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException(cnfe);
+      }
+      
+      List<Statistics> matchedStats = null;
+      if (newOutputFormat instanceof 
+          org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
+        matchedStats = 
+            MRTask.getFsStatistics(
+                org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+                    .getOutputPath(taskAttemptContext), 
+                jobConf);
+      }
+      fsStats = matchedStats;
+
+      long bytesOutPrev = getOutputBytes();
+      newRecordWriter = 
+          newOutputFormat.getRecordWriter(this.taskAttemptContext);
+      long bytesOutCurr = getOutputBytes();
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    } else {
+      oldOutputFormat = jobConf.getOutputFormat();
+      
+      List<Statistics> matchedStats = null;
+      if (oldOutputFormat instanceof org.apache.hadoop.mapred.FileOutputFormat) {
+        matchedStats = 
+            MRTask.getFsStatistics(
+                org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
+                    jobConf), 
+                jobConf);
+      }
+      fsStats = matchedStats;
+
+      FileSystem fs = FileSystem.get(jobConf);
+      String finalName = task.getOutputName();
+
+      long bytesOutPrev = getOutputBytes();
+      oldRecordWriter = 
+          oldOutputFormat.getRecordWriter(
+              fs, jobConf, finalName, reporter);
+      long bytesOutCurr = getOutputBytes();
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    }
+  }
+  
+  public void write(Object key, Object value) 
+      throws IOException, InterruptedException {
+
+    reporter.progress();
+    long bytesOutPrev = getOutputBytes();
+  
+    if (useNewApi) {
+      newRecordWriter.write(key, value);
+    } else {
+      oldRecordWriter.write(key, value);
+    }
+    
+    long bytesOutCurr = getOutputBytes();
+    fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    outputRecordCounter.increment(1);
+
+  }
+
+  public void close() throws IOException, InterruptedException {
+    reporter.progress();
+    long bytesOutPrev = getOutputBytes();
+    if (useNewApi) {
+      newRecordWriter.close(taskAttemptContext);
+    } else {
+      oldRecordWriter.close(null);
+    }
+    long bytesOutCurr = getOutputBytes();
+    fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+  }
+
+  public org.apache.hadoop.mapreduce.OutputFormat getNewOutputFormat() {
+    return newOutputFormat;
+  }
+  
+  public org.apache.hadoop.mapred.OutputFormat getOldOutputFormat() {
+    return oldOutputFormat;
+  }
+  
+  private long getOutputBytes() {
+    if (fsStats == null) return 0;
+    long bytesWritten = 0;
+    for (Statistics stat: fsStats) {
+      bytesWritten = bytesWritten + stat.getBytesWritten();
+    }
+    return bytesWritten;
+  }
+
+  @Override
+  public OutputContext getOutputContext() {
+    return null;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,104 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.partition;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.mapreduce.processor.MRTask;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
+
+  static final Log LOG = LogFactory.getLog(MRPartitioner.class);
+  private final MRTask task;
+  
+  JobConf jobConf;
+  boolean useNewApi;
+  
+  org.apache.hadoop.mapred.Partitioner oldPartitioner;
+  org.apache.hadoop.mapreduce.Partitioner newPartitioner;
+
+  public MRPartitioner(MRTask task) {
+    this.task = task;
+  }
+  
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException {
+    if (conf instanceof JobConf) {
+      jobConf = (JobConf)conf;
+    } else {
+      jobConf = new JobConf(conf);
+    }
+    
+    useNewApi = jobConf.getUseNewMapper();
+    final int partitions = jobConf.getNumReduceTasks();
+    if (useNewApi) {
+      if (partitions > 1) {
+        try {
+          newPartitioner = (org.apache.hadoop.mapreduce.Partitioner)
+            ReflectionUtils.newInstance(
+                task.getJobContext().getPartitionerClass(), jobConf);
+        } catch (ClassNotFoundException cnfe) {
+          throw new IOException(cnfe);
+        }
+      } else {
+        newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() {
+          @Override
+          public int getPartition(Object key, Object value, int numPartitions) {
+            return numPartitions - 1;
+          }
+        };
+      }
+    } else {
+      if (partitions > 1) {
+        oldPartitioner = (Partitioner)
+          ReflectionUtils.newInstance(jobConf.getPartitionerClass(), jobConf);
+      } else {
+        oldPartitioner = new Partitioner() {
+          @Override
+          public void configure(JobConf job) {}
+          
+          @Override
+          public int getPartition(Object key, Object value, int numPartitions) {
+            return numPartitions - 1;
+          }
+        };
+      }
+
+    }
+
+  }
+  
+  @Override
+  public int getPartition(Object key, Object value, int numPartitions) {
+    if (useNewApi) {
+      return newPartitioner.getPartition(key, value, numPartitions);
+    } else {
+      return oldPartitioner.getPartition(key, value, numPartitions);
+    }
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.processor;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+
+  /**
+   * An updater that tracks the last number reported for a given file
+   * system and only creates the counters when they are needed.
+   */
+  class FileSystemStatisticUpdater {
+    private List<FileSystem.Statistics> stats;
+    private TezCounter readBytesCounter, writeBytesCounter,
+        readOpsCounter, largeReadOpsCounter, writeOpsCounter;
+    private String scheme;
+    private TezCounters counters;
+
+    FileSystemStatisticUpdater(TezCounters counters, List<FileSystem.Statistics> stats, String scheme) {
+      this.stats = stats;
+      this.scheme = scheme;
+      this.counters = counters;
+    }
+
+    void updateCounters() {
+      if (readBytesCounter == null) {
+        readBytesCounter = counters.findCounter(scheme,
+            FileSystemCounter.BYTES_READ);
+      }
+      if (writeBytesCounter == null) {
+        writeBytesCounter = counters.findCounter(scheme,
+            FileSystemCounter.BYTES_WRITTEN);
+      }
+      if (readOpsCounter == null) {
+        readOpsCounter = counters.findCounter(scheme,
+            FileSystemCounter.READ_OPS);
+      }
+      if (largeReadOpsCounter == null) {
+        largeReadOpsCounter = counters.findCounter(scheme,
+            FileSystemCounter.LARGE_READ_OPS);
+      }
+      if (writeOpsCounter == null) {
+        writeOpsCounter = counters.findCounter(scheme,
+            FileSystemCounter.WRITE_OPS);
+      }
+      long readBytes = 0;
+      long writeBytes = 0;
+      long readOps = 0;
+      long largeReadOps = 0;
+      long writeOps = 0;
+      for (FileSystem.Statistics stat: stats) {
+        readBytes = readBytes + stat.getBytesRead();
+        writeBytes = writeBytes + stat.getBytesWritten();
+        readOps = readOps + stat.getReadOps();
+        largeReadOps = largeReadOps + stat.getLargeReadOps();
+        writeOps = writeOps + stat.getWriteOps();
+      }
+      readBytesCounter.setValue(readBytes);
+      writeBytesCounter.setValue(writeBytes);
+      readOpsCounter.setValue(readOps);
+      largeReadOpsCounter.setValue(largeReadOps);
+      writeOpsCounter.setValue(writeOps);
+    }
+  }
+  

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.processor;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.counters.TaskCounter;
+
+/**
+   * An updater that tracks the amount of time this task has spent in GC.
+   */
+  class GcTimeUpdater {
+    private long lastGcMillis = 0;
+    private List<GarbageCollectorMXBean> gcBeans = null;
+    TezCounters counters;
+
+    public GcTimeUpdater(TezCounters counters) {
+      this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
+      getElapsedGc(); // Initialize 'lastGcMillis' with the current time spent.
+      this.counters = counters;
+    }
+
+    /**
+     * @return the number of milliseconds that the gc has used for CPU
+     * since the last time this method was called.
+     */
+    protected long getElapsedGc() {
+      long thisGcMillis = 0;
+      for (GarbageCollectorMXBean gcBean : gcBeans) {
+        thisGcMillis += gcBean.getCollectionTime();
+      }
+
+      long delta = thisGcMillis - lastGcMillis;
+      this.lastGcMillis = thisGcMillis;
+      return delta;
+    }
+
+    /**
+     * Increment the gc-elapsed-time counter.
+     */
+    public void incrementGcCounter() {
+      if (null == counters) {
+        return; // nothing to do.
+      }
+
+      TezCounter gcCounter =
+        counters.findCounter(TaskCounter.GC_TIME_MILLIS);
+      if (null != gcCounter) {
+        gcCounter.increment(getElapsedGc());
+      }
+    }
+  }

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,716 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.processor;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.TezTaskStatus.Phase;
+import org.apache.tez.common.TezTaskStatus.State;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRTaskStatus;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.TezTypeConverters;
+import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+import org.apache.tez.mapreduce.partition.MRPartitioner;
+
+public abstract class MRTask 
+extends org.apache.tez.common.TezTask {
+
+  static final Log LOG = LogFactory.getLog(MRTask.class);
+  
+  protected JobConf jobConf;
+  protected JobContext jobContext;
+  protected TaskAttemptContext taskAttemptContext;
+  protected OutputCommitter committer;
+  
+  // Current counters
+  transient TezCounters counters = new TezCounters();
+  protected GcTimeUpdater gcUpdater;
+  private ResourceCalculatorProcessTree pTree;
+  private long initCpuCumulativeTime = 0;
+  protected TezEngineTask tezTaskContext;
+  
+  /* flag to track whether task is done */
+  AtomicBoolean taskDone = new AtomicBoolean(false);
+  
+  /** Construct output file names so that, when an output directory listing is
+   * sorted lexicographically, positions correspond to output partitions.*/
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+                          
+  private final static int MAX_RETRIES = 10;
+
+  /** The number of milliseconds between progress reports. */
+  public static final int PROGRESS_INTERVAL = 3000;
+
+  private MRTaskReporter mrReporter;
+  
+  protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
+  
+  /**
+   * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
+   */
+  private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
+     new HashMap<String, FileSystemStatisticUpdater>();
+
+  public MRTask(TezTask context) {
+    super(context.getTaskAttemptId(), context.getUser(), context.getJobName(),
+        ((TezEngineTask)context).getTaskModuleClassName());
+    
+    tezTaskContext = (TezEngineTask) context;
+    // TODO TEZAM4 Figure out initialization / run sequence of Input, Process,
+    // Output. Phase is MR specific.
+    status =
+        new MRTaskStatus(
+            getTaskAttemptId(),
+            counters,
+            (getTaskAttemptId().getTaskID().getVertexID().getId() == 0 ? 
+                Phase.MAP : Phase.SHUFFLE)
+        );
+    gcUpdater = new GcTimeUpdater(counters);
+  }
+
+  public void initialize(Configuration conf, Master master) throws IOException,
+  InterruptedException {
+
+    if (conf instanceof JobConf) {
+      this.jobConf = (JobConf)conf;
+    } else {
+      this.jobConf = new JobConf(conf);
+    }
+    reporter = 
+        new TezTaskReporterImpl(this, (TezTaskUmbilicalProtocol)master);
+    ((TezTaskReporterImpl)reporter).startCommunicationThread();
+    
+    jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID, 
+        getTaskAttemptId().toString());
+    
+    initResourceCalculatorPlugin();
+    
+    LOG.info("MRTask.inited: taskAttemptId = " + getTaskAttemptId().toString());
+  }
+
+  private void initResourceCalculatorPlugin() {
+    Class<? extends ResourceCalculatorProcessTree> clazz =
+        this.jobConf.getClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
+            null, ResourceCalculatorProcessTree.class);
+    pTree = ResourceCalculatorProcessTree
+        .getResourceCalculatorProcessTree(System.getenv().get("JVM_PID"), clazz, this.jobConf);
+    LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
+    if (pTree != null) {
+      pTree.updateProcessTree();
+      initCpuCumulativeTime = pTree.getCumulativeCpuTime();
+    }
+  }
+
+  public TezTaskUmbilicalProtocol getUmbilical() {
+    return ((TezTaskReporterImpl)reporter).getUmbilical();
+  }
+  
+  public void initTask(JobConf job, TezDAGID dagId,
+      MRTaskReporter mrReporter,
+      boolean useNewApi) throws IOException,
+                                InterruptedException {
+    this.jobConf = job;
+    this.jobContext = new JobContextImpl(job, dagId, mrReporter);
+    this.taskAttemptContext =
+        new TaskAttemptContextImpl(job, getTaskAttemptId(), mrReporter);
+    this.mrReporter = mrReporter;
+
+    if (getState() == State.UNASSIGNED) {
+      setState(State.RUNNING);
+    }
+
+    // Save the committer
+    if (useNewApi) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("using new api for output committer");
+      }
+      OutputFormat<?, ?> outputFormat = null;
+      try {
+        outputFormat =
+            ReflectionUtils.newInstance(
+                taskAttemptContext.getOutputFormatClass(), job);
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException("Unknown OutputFormat", cnfe);
+      }
+      setCommitter(outputFormat.getOutputCommitter(taskAttemptContext));
+    } else {
+      setCommitter(job.getOutputCommitter());
+    }
+    
+    Path outputPath = FileOutputFormat.getOutputPath(job);
+    if (outputPath != null) {
+      if ((getCommitter() instanceof FileOutputCommitter)) {
+        FileOutputFormat.setWorkOutputPath(job, 
+            ((FileOutputCommitter)getCommitter()).getTaskAttemptPath(taskAttemptContext));
+      } else {
+        FileOutputFormat.setWorkOutputPath(job, outputPath);
+      }
+    }
+    getCommitter().setupTask(taskAttemptContext);
+    
+    partitioner = new MRPartitioner(this);
+    ((MRPartitioner)partitioner).initialize(job, getTaskReporter());
+
+    localizeConfiguration(jobConf);
+  }
+
+  public MRTaskReporter getMRReporter() {
+    return mrReporter;
+  }
+
+  public void setState(State state) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  public State getState() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public OutputCommitter getCommitter() {
+    return committer;
+  }
+
+  public void setCommitter(OutputCommitter committer) {
+    this.committer = committer;
+  }
+
+  public TezCounters getCounters() { return counters; }
+  
+  /**
+   * Return current phase of the task. 
+   * needs to be synchronized as communication thread sends the phase every second
+   * @return the curent phase of the task
+   */
+  public synchronized TezTaskStatus.Phase getPhase(){
+    return status.getPhase(); 
+  }
+  
+  /**
+   * Set current phase of the task. 
+   * @param phase task phase 
+   */
+  protected synchronized void setPhase(TezTaskStatus.Phase phase){
+    status.setPhase(phase); 
+  }
+
+  public void setConf(JobConf jobConf) {
+    this.jobConf = jobConf;
+  }
+
+  public JobConf getConf() {
+    return this.jobConf;
+  }
+
+  /**
+   * Gets a handle to the Statistics instance based on the scheme associated
+   * with path.
+   * 
+   * @param path the path.
+   * @param conf the configuration to extract the scheme from if not part of 
+   *   the path.
+   * @return a Statistics instance, or null if none is found for the scheme.
+   */
+  @Private
+  public static List<Statistics> getFsStatistics(Path path, Configuration conf) throws IOException {
+    List<Statistics> matchedStats = new ArrayList<FileSystem.Statistics>();
+    path = path.getFileSystem(conf).makeQualified(path);
+    String scheme = path.toUri().getScheme();
+    for (Statistics stats : FileSystem.getAllStatistics()) {
+      if (stats.getScheme().equals(scheme)) {
+        matchedStats.add(stats);
+      }
+    }
+    return matchedStats;
+  }
+
+  @Private
+  public synchronized String getOutputName() {
+    return "part-" + NUMBER_FORMAT.format(getTaskAttemptId().getTaskID().getId());
+  }
+ 
+  public void waitBeforeCompletion(MRTaskReporter reporter) throws IOException,
+      InterruptedException {
+    TezTaskUmbilicalProtocol umbilical = getUmbilical();
+    int retries = MAX_RETRIES;
+    boolean readyToProceed = false;
+    while (!readyToProceed) {
+      try {
+        ProceedToCompletionResponse response =
+            umbilical.proceedToCompletion(getTaskAttemptId());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got readyToProceed: " + response);
+        }
+        if (response.shouldDie()) {
+          throw new IOException("Task was asked to die by the AM");
+          // TODO EVENTUALLY Figure out a good way for a graceful exit, instead
+          // of an exit via an Exception. This isn' necessarily an error.
+        }
+        readyToProceed = response.readyToProceed();
+      } catch (IOException ie) {
+        LOG.warn("Failure waiting for exit signal: " +
+            StringUtils.stringifyException(ie));
+        if (--retries == 0) {
+          throw ie;
+        }
+      }
+      synchronized(this) {
+        wait(1000l); // Check if ready to exit every second.
+      }
+    }
+  }
+
+  public void outputReady(MRTaskReporter reporter, OutputContext outputContext)
+      throws IOException,
+      InterruptedException {
+    LOG.info("Task: " + getTaskAttemptId() + " reporting outputReady");
+    updateCounters();
+    statusUpdate();
+
+    TezTaskUmbilicalProtocol umbilical = getUmbilical();
+    int retries = MAX_RETRIES;
+    while (true) {
+      try {
+        umbilical.outputReady(getTaskAttemptId(), outputContext);
+        LOG.info("Task '" + getTaskAttemptId() + "' reported outputReady.");
+        return;
+      } catch (IOException ie) {
+        LOG.warn("Failure signalling outputReady: " +
+            StringUtils.stringifyException(ie));
+        if (--retries == 0) {
+          throw ie;
+        }
+      }
+    }
+  }
+
+  public void done(
+      OutputContext outputContext,
+      MRTaskReporter reporter
+      ) throws IOException, InterruptedException {
+    updateCounters();
+    if (outputContext != null) {
+      LOG.info("Task: "
+          + getTaskAttemptId()
+          + " is done."
+          + " And is in the process of sending output-context with shuffle port: "
+          + outputContext.getShufflePort());
+      outputReady(reporter, outputContext);
+      waitBeforeCompletion(reporter);
+    }
+
+    LOG.info("Task:" + getTaskAttemptId() + " is done."
+        + " And is in the process of committing");
+    TezTaskUmbilicalProtocol umbilical = getUmbilical();
+    // TODO TEZ Interaciton between Commit and OutputReady. Merge ?
+    if (isCommitRequired()) {
+      int retries = MAX_RETRIES;
+      setState(TezTaskStatus.State.COMMIT_PENDING);
+      // say the task tracker that task is commit pending
+      // TODO TEZAM2 - Why is the commitRequired check missing ?
+      while (true) {
+        try {
+          umbilical.commitPending(getTaskAttemptId(), status);
+          break;
+        } catch (InterruptedException ie) {
+          // ignore
+        } catch (IOException ie) {
+          LOG.warn("Failure sending commit pending: " + 
+              StringUtils.stringifyException(ie));
+          if (--retries == 0) {
+            System.exit(67);
+          }
+        }
+      }
+      //wait for commit approval and commit
+      // TODO EVENTUALLY - Commit is not required for map tasks. skip a couple of RPCs before exiting.
+      commit(umbilical, reporter, committer);
+    }
+    taskDone.set(true);
+    reporter.stopCommunicationThread();
+    // Make sure we send at least one set of counter increments. It's
+    // ok to call updateCounters() in this thread after comm thread stopped.
+    updateCounters();
+    sendLastUpdate();
+    //signal the tasktracker that we are done
+    sendDone(umbilical);
+  }
+
+  
+  private boolean isCommitRequired() throws IOException {
+    return committer.needsTaskCommit(taskAttemptContext);
+  }
+
+  /**
+   * Send a status update to the task tracker
+   * @param umbilical
+   * @throws IOException
+   */
+  public void statusUpdate() throws IOException, InterruptedException {
+    int retries = MAX_RETRIES;
+    while (true) {
+      try {
+        if (!getUmbilical().statusUpdate(getTaskAttemptId(), status)) {
+          LOG.warn("Parent died.  Exiting " + getTaskAttemptId());
+          System.exit(66);
+        }
+        status.clearStatus();
+        return;
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt(); // interrupt ourself
+      } catch (IOException ie) {
+        LOG.warn("Failure sending status update: " + 
+            StringUtils.stringifyException(ie));
+        if (--retries == 0) {
+          throw ie;
+        }
+      }
+    }
+  }
+
+  /**
+   * Sends last status update before sending umbilical.done(); 
+   */
+  private void sendLastUpdate() 
+      throws IOException, InterruptedException {
+    status.setOutputSize(-1l);
+    // send a final status report
+    status.statusUpdate(
+        getProgress().get(), getProgress().toString(), counters);
+    statusUpdate();
+  }
+
+  private void commit(TezTaskUmbilicalProtocol umbilical,
+      MRTaskReporter reporter,
+      org.apache.hadoop.mapreduce.OutputCommitter committer
+      ) throws IOException {
+    int retries = MAX_RETRIES;
+    while (true) {
+      try {
+        while (!umbilical.canCommit(getTaskAttemptId())) {
+          // This will loop till the AM asks for the task to be killed. As
+          // against, the AM sending a signal to the task to kill itself
+          // gracefully.
+          try {
+            Thread.sleep(1000);
+          } catch(InterruptedException ie) {
+            //ignore
+          }
+          reporter.setProgressFlag();
+        }
+        break;
+      } catch (IOException ie) {
+        LOG.warn("Failure asking whether task can commit: " + 
+            StringUtils.stringifyException(ie));
+        if (--retries == 0) {
+          //if it couldn't query successfully then delete the output
+          discardOutput(taskAttemptContext);
+          System.exit(68);
+        }
+      }
+    }
+
+    // task can Commit now  
+    try {
+      LOG.info("Task " + getTaskAttemptId() + " is allowed to commit now");
+      committer.commitTask(taskAttemptContext);
+      return;
+    } catch (IOException iee) {
+      LOG.warn("Failure committing: " + 
+          StringUtils.stringifyException(iee));
+      //if it couldn't commit a successfully then delete the output
+      discardOutput(taskAttemptContext);
+      throw iee;
+    }
+  }
+
+  private 
+  void discardOutput(TaskAttemptContext taskContext) {
+    try {
+      committer.abortTask(taskContext);
+    } catch (IOException ioe)  {
+      LOG.warn("Failure cleaning up: " + 
+               StringUtils.stringifyException(ioe));
+    }
+  }
+
+
+  private void sendDone(TezTaskUmbilicalProtocol umbilical) throws IOException {
+    int retries = MAX_RETRIES;
+    while (true) {
+      try {
+        umbilical.done(getTaskAttemptId());
+        LOG.info("Task '" + getTaskAttemptId() + "' done.");
+        return;
+      } catch (IOException ie) {
+        LOG.warn("Failure signalling completion: " + 
+                 StringUtils.stringifyException(ie));
+        if (--retries == 0) {
+          throw ie;
+        }
+      }
+    }
+  }
+
+  public void updateCounters() {
+    // TODO Auto-generated method stub
+    // TODO TEZAM Implement.
+    Map<String, List<FileSystem.Statistics>> map = new 
+        HashMap<String, List<FileSystem.Statistics>>();
+    for(Statistics stat: FileSystem.getAllStatistics()) {
+      String uriScheme = stat.getScheme();
+      if (map.containsKey(uriScheme)) {
+        List<FileSystem.Statistics> list = map.get(uriScheme);
+        list.add(stat);
+      } else {
+        List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
+        list.add(stat);
+        map.put(uriScheme, list);
+      }
+    }
+    for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet()) {
+      FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
+      if(updater==null) {//new FileSystem has been found in the cache
+        updater =
+            new FileSystemStatisticUpdater(counters, entry.getValue(),
+                entry.getKey());
+        statisticUpdaters.put(entry.getKey(), updater);
+      }
+      updater.updateCounters();
+    }
+    
+    gcUpdater.incrementGcCounter();
+    updateResourceCounters();
+  }
+
+  /**
+   * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the
+   * current total committed heap space usage of this JVM.
+   */
+  private void updateHeapUsageCounter() {
+    long currentHeapUsage = Runtime.getRuntime().totalMemory();
+    counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES)
+            .setValue(currentHeapUsage);
+  }
+
+  /**
+   * Update resource information counters
+   */
+  void updateResourceCounters() {
+    // Update generic resource counters
+    updateHeapUsageCounter();
+
+    // Updating resources specified in ResourceCalculatorPlugin
+    if (pTree == null) {
+      return;
+    }
+    pTree.updateProcessTree();
+    long cpuTime = pTree.getCumulativeCpuTime();
+    long pMem = pTree.getCumulativeRssmem();
+    long vMem = pTree.getCumulativeVmem();
+    // Remove the CPU time consumed previously by JVM reuse
+    cpuTime -= initCpuCumulativeTime;
+    counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
+    counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
+    counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
+  }
+  
+
+  public static String normalizeStatus(String status, Configuration conf) {
+    // Check to see if the status string is too long
+    // and truncate it if needed.
+    int progressStatusLength = conf.getInt(
+        MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY,
+        MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT);
+    if (status.length() > progressStatusLength) {
+      LOG.warn("Task status: \"" + status + "\" truncated to max limit ("
+          + progressStatusLength + " characters)");
+      status = status.substring(0, progressStatusLength);
+    }
+    return status;
+  }
+  
+  protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> 
+  org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+  createReduceContext(org.apache.hadoop.mapreduce.Reducer
+                        <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
+                      Configuration job,
+                      TezTaskAttemptID taskId, 
+                      final TezRawKeyValueIterator rIter,
+                      org.apache.hadoop.mapreduce.Counter inputKeyCounter,
+                      org.apache.hadoop.mapreduce.Counter inputValueCounter,
+                      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, 
+                      org.apache.hadoop.mapreduce.OutputCommitter committer,
+                      org.apache.hadoop.mapreduce.StatusReporter reporter,
+                      RawComparator<INKEY> comparator,
+                      Class<INKEY> keyClass, Class<INVALUE> valueClass
+  ) throws IOException, InterruptedException {
+    RawKeyValueIterator r = 
+        new RawKeyValueIterator() {
+          
+          @Override
+          public boolean next() throws IOException {
+            return rIter.next();
+          }
+          
+          @Override
+          public DataInputBuffer getValue() throws IOException {
+            return rIter.getValue();
+          }
+          
+          @Override
+          public Progress getProgress() {
+            return rIter.getProgress();
+          }
+          
+          @Override
+          public DataInputBuffer getKey() throws IOException {
+            return rIter.getKey();
+          }
+          
+          @Override
+          public void close() throws IOException {
+            rIter.close();
+          }
+        };
+    org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
+    reduceContext = 
+      new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
+          job, 
+          IDConverter.toMRTaskAttemptId(taskId), 
+          r, 
+          inputKeyCounter, 
+          inputValueCounter, 
+          output, 
+          committer, 
+          reporter, 
+          comparator, 
+          keyClass, 
+          valueClass);
+    LOG.info("DEBUG: Using key class: " + keyClass + ", valueClass: " + valueClass);
+
+    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
+        reducerContext = 
+          new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
+              reduceContext);
+
+    return reducerContext;
+  }
+
+  public void taskCleanup(TezTaskUmbilicalProtocol umbilical) 
+      throws IOException, InterruptedException {
+    // set phase for this task
+    setPhase(TezTaskStatus.Phase.CLEANUP);
+    getProgress().setStatus("cleanup");
+    statusUpdate();
+    LOG.info("Runnning cleanup for the task");
+    // do the cleanup
+    committer.abortTask(taskAttemptContext);
+  }
+
+  public void localizeConfiguration(JobConf jobConf) 
+      throws IOException, InterruptedException {
+    jobConf.set(JobContext.TASK_ID, getTaskAttemptId().getTaskID().toString()); 
+    jobConf.set(JobContext.TASK_ATTEMPT_ID, getTaskAttemptId().toString());
+    jobConf.setInt(JobContext.TASK_PARTITION, 
+        getTaskAttemptId().getTaskID().getId());
+    jobConf.set(JobContext.ID, getTaskAttemptId().getTaskID().getVertexID().getDAGId().toString());
+  }
+
+  public abstract TezCounter getOutputRecordsCounter();
+
+  public abstract TezCounter getInputRecordsCounter();
+
+  public TezCounter getFileOutputBytesCounter() {
+    return reporter.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
+  }
+
+  public org.apache.hadoop.mapreduce.TaskAttemptContext getTaskAttemptContext() {
+    return taskAttemptContext;
+  }
+
+  public TezCounter getFileInputBytesCounter() {
+    return reporter.getCounter(FileInputFormatCounter.BYTES_READ);
+  }
+
+  public TaskSplitIndex getSplitIndex() {
+    return splitMetaInfo;
+  }
+
+  public JobContext getJobContext() {
+    return jobContext;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
------------------------------------------------------------------------------
    svn:eol-style = native