You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/12 02:45:20 UTC

git commit: TEZ-418. Change SimpleInput to work with the new engine APIs (part of TEZ-398). (sseth)

Updated Branches:
  refs/heads/TEZ-398 c86e0e40d -> 7974742bf


TEZ-418. Change SimpleInput to work with the new engine APIs (part of
TEZ-398). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7974742b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7974742b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7974742b

Branch: refs/heads/TEZ-398
Commit: 7974742bf50eb3427b3b2076c63f0f0714fba8ad
Parents: c86e0e4
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Sep 11 17:44:47 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Sep 11 17:44:47 2013 -0700

----------------------------------------------------------------------
 .../engine/lib/input/ShuffledMergedInput.java   |   2 +-
 .../org/apache/tez/engine/newapi/KVReader.java  |   4 +-
 .../org/apache/tez/mapreduce/common/Utils.java  |  47 +++
 .../mapreduce/hadoop/newmapred/MRReporter.java  |  70 ++++
 .../newmapreduce/TaskAttemptContextImpl.java    |  86 ++++
 .../tez/mapreduce/newinput/SimpleInput.java     | 417 +++++++++++++++++++
 6 files changed, 623 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7974742b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
index fa7054a..5d67b0c 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
@@ -130,7 +130,7 @@ public class ShuffledMergedInput implements LogicalInput {
     return new KVReader() {
       
       @Override
-      public boolean moveToNext() throws IOException {
+      public boolean next() throws IOException {
         return vIter.moveToNext();
       }
       

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7974742b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
index bd0e933..b74c4eb 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
@@ -25,7 +25,7 @@ import java.io.IOException;
  * 
  * Example usage
  * <code>
- * while (kvReader.moveToNext()) {
+ * while (kvReader.next()) {
  *   KVRecord kvRecord = getCurrentKV();
  *   Object key =  kvRecord.getKey();
  *   Iterable values = kvRecord.getValues();
@@ -41,7 +41,7 @@ public interface KVReader extends Reader {
    * @throws IOException
    *           if an error occurs
    */
-  public boolean moveToNext() throws IOException;
+  public boolean next() throws IOException;
 
   /**
    * Return the current key/value(s) pair. Use moveToNext() to advance.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7974742b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java
new file mode 100644
index 0000000..f7cd6f0
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java
@@ -0,0 +1,47 @@
+package org.apache.tez.mapreduce.common;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
+
+import com.google.common.base.Preconditions;
+
+public class Utils {
+
+  /**
+   * Gets a handle to the Statistics instance based on the scheme associated
+   * with path.
+   *
+   * @param path the path.
+   * @param conf the configuration to extract the scheme from if not part of
+   *   the path.
+   * @return a Statistics instance, or null if none is found for the scheme.
+   */
+  @Private
+  public static List<Statistics> getFsStatistics(Path path, Configuration conf) throws IOException {
+    List<Statistics> matchedStats = new ArrayList<FileSystem.Statistics>();
+    path = path.getFileSystem(conf).makeQualified(path);
+    String scheme = path.toUri().getScheme();
+    for (Statistics stats : FileSystem.getAllStatistics()) {
+      if (stats.getScheme().equals(scheme)) {
+        matchedStats.add(stats);
+      }
+    }
+    return matchedStats;
+  }
+
+  public static Counter getMRCounter(TezCounter tezCounter) {
+    Preconditions.checkNotNull(tezCounter);
+    return new MRCounters.MRCounter(tezCounter);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7974742b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/MRReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/MRReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/MRReporter.java
new file mode 100644
index 0000000..4638ab3
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapred/MRReporter.java
@@ -0,0 +1,70 @@
+package org.apache.tez.mapreduce.hadoop.newmapred;
+
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.tez.engine.newapi.TezTaskContext;
+import org.apache.tez.mapreduce.common.Utils;
+
+public class MRReporter implements Reporter {
+
+  private TezTaskContext tezTaskContext;
+  private InputSplit split;
+  
+  public MRReporter(TezTaskContext tezTaskContext) {
+    this(tezTaskContext, null);
+  }
+
+  public MRReporter(TezTaskContext tezTaskContext, InputSplit split) {
+    this.tezTaskContext = tezTaskContext;
+    this.split = split;
+  }
+  
+  @Override
+  public void progress() {
+    // Not reporting progress in Tez.
+  }
+
+  @Override
+  public void setStatus(String status) {
+    // Not setting status string in Tez.
+
+  }
+
+  @Override
+  public Counter getCounter(Enum<?> name) {
+    return Utils.getMRCounter(tezTaskContext.getCounters().findCounter(name));
+  }
+
+  @Override
+  public Counter getCounter(String group, String name) {
+    return Utils.getMRCounter(tezTaskContext.getCounters().findCounter(group,
+        name));
+  }
+
+  @Override
+  public void incrCounter(Enum<?> key, long amount) {
+    getCounter(key).increment(amount);
+  }
+
+  @Override
+  public void incrCounter(String group, String counter, long amount) {
+    getCounter(group, counter).increment(amount);
+  }
+
+  @Override
+  public InputSplit getInputSplit() throws UnsupportedOperationException {
+    if (split == null) {
+      throw new UnsupportedOperationException("Input only available on map");
+    } else {
+      return split;
+    }
+  }
+
+  @Override
+  public float getProgress() {
+    // TOOD NEWTEZ Does this make a difference to anything ?
+    return 0.0f;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7974742b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
new file mode 100644
index 0000000..5b70b31
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
@@ -0,0 +1,86 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop.newmapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.mapreduce.common.Utils;
+
+// NOTE: NEWTEZ: This is a copy of org.apache.tez.mapreduce.hadoop.mapred (not mapreduce). mapred likely does not need it's own copy of this class.
+// Meant for use by the "mapreduce" API
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TaskAttemptContextImpl
+       extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl {
+  private TezInputContext inputContext;
+
+  // FIXME we need to use DAG Id but we are using App Id
+  public TaskAttemptContextImpl(Configuration conf, TezInputContext inputContext) {
+    // TODO NEWTEZ Figure out how to compute the TaskType - MAP or REDUCE. For
+    // SimpleInput, it likely doesn't matter - but setting it to MAP
+    // TODO NEWTEZ Can the jt Identifier string be inputContext.getUniqueId ?
+    super(conf, new TaskAttemptID(
+        new TaskID(String.valueOf(inputContext.getApplicationId()
+            .getClusterTimestamp()), inputContext.getApplicationId().getId(),
+            TaskType.MAP, inputContext.getTaskIndex()),
+        inputContext.getAttemptNumber()));
+    this.inputContext = inputContext;
+    
+  }
+  
+  @Override
+  public float getProgress() {
+    // TODO NEWTEZ Will this break anything ?
+    return 0.0f;
+  }
+
+  @Override
+  public Counter getCounter(Enum<?> counterName) {
+    return Utils.getMRCounter(inputContext.getCounters().findCounter(counterName));
+  }
+
+  @Override
+  public Counter getCounter(String groupName, String counterName) {
+    return Utils.getMRCounter(inputContext.getCounters().findCounter(groupName, counterName));
+  }
+
+  /**
+   * Report progress.
+   */
+  @Override
+  public void progress() {
+    // Nothing to do.
+  }
+
+  /**
+   * Set the current status of the task to the given string.
+   */
+  @Override
+  public void setStatus(String status) {
+    setStatusString(status);
+    // Nothing to do until InputContext supports some kind of custom string
+    // diagnostics.
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7974742b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
new file mode 100644
index 0000000..616ce35
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.newinput;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVReader;
+import org.apache.tez.engine.newapi.LogicalInput;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.newmapred.MRReporter;
+import org.apache.tez.mapreduce.hadoop.newmapreduce.TaskAttemptContextImpl;
+
+/**
+ * {@link SimpleInput} is an {@link Input} which provides key/values pairs
+ * for the consumer.
+ *
+ * It is compatible with all standard Apache Hadoop MapReduce 
+ * {@link InputFormat} implementations.
+ */
+
+public class SimpleInput implements LogicalInput {
+
+  private static final Log LOG = LogFactory.getLog(SimpleInput.class);
+  
+  
+  private TezInputContext inputContext;
+  
+  private JobConf jobConf;
+  private Configuration incrementalConf;
+  
+  boolean useNewApi;
+  
+  org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
+
+  @SuppressWarnings("rawtypes")
+  private org.apache.hadoop.mapreduce.InputFormat newInputFormat;
+  @SuppressWarnings("rawtypes")
+  private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
+  private org.apache.hadoop.mapreduce.InputSplit newInputSplit;
+  
+  @SuppressWarnings("rawtypes")
+  private InputFormat oldInputFormat;
+  @SuppressWarnings("rawtypes")
+  private RecordReader oldRecordReader;
+
+  protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
+  
+  // Setup the values iterator once, and set value on the same object each time
+  // to prevent lots of objects being created.
+  private SimpleValueIterator valueIterator = new SimpleValueIterator();
+  private SimpleIterable valueIterable = new SimpleIterable(valueIterator);
+
+  private TezCounter inputRecordCounter;
+  private TezCounter fileInputByteCounter; 
+  private List<Statistics> fsStats;
+
+  @Override
+  public List<Event> initialize(TezInputContext inputContext) throws IOException {
+    Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+    this.jobConf = new JobConf(conf);
+    
+ // Read split information.
+    TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
+    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];
+    this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
+        thisTaskMetaInfo.getStartOffset());
+    
+    // TODO NEWTEZ Rename this to be specific to SimpleInput. This Input, in
+    // theory, can be used by the MapProcessor, ReduceProcessor or a custom
+    // processor. (The processor could provide the counter though)
+    this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
+    this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
+    
+    useNewApi = this.jobConf.getUseNewMapper();
+
+    if (useNewApi) {
+      TaskAttemptContext taskAttemptContext = createTaskAttemptContext();
+      Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
+      try {
+        inputFormatClazz = taskAttemptContext.getInputFormatClass();
+      } catch (ClassNotFoundException e) {
+        throw new IOException("Unable to instantiate InputFormat class", e);
+      }
+
+      newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
+
+      newInputSplit = getNewSplitDetails(splitMetaInfo);
+
+      List<Statistics> matchedStats = null;
+      if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+        matchedStats = Utils.getFsStatistics(
+            ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
+                newInputSplit).getPath(), this.jobConf);
+      }
+      fsStats = matchedStats;
+      
+      try {
+        newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
+        newRecordReader.initialize(newInputSplit, taskAttemptContext);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while creating record reader", e);
+      }
+    } else { // OLD API
+      oldInputFormat = this.jobConf.getInputFormat();
+      InputSplit oldInputSplit =
+          getOldSplitDetails(splitMetaInfo);
+      
+      
+      List<Statistics> matchedStats = null;
+      if (oldInputSplit instanceof FileSplit) {
+        matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
+      }
+      fsStats = matchedStats;
+      
+      long bytesInPrev = getInputBytes();
+      oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
+          this.jobConf, new MRReporter(inputContext, oldInputSplit));
+      long bytesInCurr = getInputBytes();
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+      setIncrementalConfigParams(oldInputSplit);
+    }    
+    return null;
+  }
+
+  @Override
+  public KVReader getReader() throws IOException {
+    return new KVReader() {
+      
+      Object key;
+      Object value;
+      
+      @SuppressWarnings("unchecked")
+      @Override
+      public boolean next() throws IOException {
+        boolean hasNext = false;
+        long bytesInPrev = getInputBytes();
+        if (useNewApi) {
+          try {
+            hasNext = newRecordReader.nextKeyValue();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Interrupted while checking for next key-value", e);
+          }
+        } else {
+          hasNext = oldRecordReader.next(key, value);
+        }
+        long bytesInCurr = getInputBytes();
+        fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+        
+        if (hasNext) {
+          inputRecordCounter.increment(1);
+        }
+        
+        return hasNext;
+      }
+
+      @Override
+      public KVRecord getCurrentKV() throws IOException {
+        KVRecord kvRecord = null;
+        if (useNewApi) {
+          try {
+            valueIterator.setValue(newRecordReader.getCurrentValue());
+            kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Interrupted while fetching next key-value", e);
+          }
+          
+        } else {
+          valueIterator.setValue(value);
+          kvRecord = new KVRecord(key, valueIterable);
+        }
+        return kvRecord;
+      }
+    };
+  }
+
+
+  @Override
+  public void handleEvents(List<Event> inputEvents) {
+    // Not expecting any events at the moment.
+  }
+
+
+  @Override
+  public void setNumPhysicalInputs(int numInputs) {
+    // Not required at the moment. May be required if splits are sent via events.
+  }
+
+  public List<Event> close() throws IOException {
+    long bytesInPrev = getInputBytes();
+    if (useNewApi) {
+      newRecordReader.close();
+    } else {
+      oldRecordReader.close();
+    }
+    long bytesInCurr = getInputBytes();
+    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+    
+    return null;
+  }
+
+  /**
+   * SimpleInputs sets some additional parameters like split location when using
+   * the new API. This methods returns the list of additional updates, and
+   * should be used by Processors using the old MapReduce API with SimpleInput.
+   * 
+   * @return the additional fields set by SimpleInput
+   */
+  public Configuration getConfigUpdates() {
+    return new Configuration(incrementalConf);
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    if (useNewApi) {
+      return newRecordReader.getProgress();
+    } else {
+      return oldRecordReader.getProgress();
+    }
+  }
+
+  
+  private TaskAttemptContext createTaskAttemptContext() {
+    return new TaskAttemptContextImpl(this.jobConf, inputContext);
+  }
+  
+
+  private static class SimpleValueIterator implements Iterator<Object> {
+
+    private Object value;
+    int nextCount = 0;
+
+    public void setValue(Object value) {
+      this.value = value;
+    }
+
+    public boolean hasNext() {
+      return nextCount == 0;
+    }
+
+    public Object next() {
+      nextCount++;
+      Object value = this.value;
+      this.value = null;
+      return value;
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class SimpleIterable implements Iterable<Object> {
+    private final Iterator<Object> iterator;
+    public SimpleIterable(Iterator<Object> iterator) {
+      this.iterator = iterator;
+    }
+
+    @Override
+    public Iterator<Object> iterator() {
+      return iterator;
+    }
+  }
+
+
+
+  
+  @SuppressWarnings("unchecked")
+  private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo)
+      throws IOException {
+    Path file = new Path(splitMetaInfo.getSplitLocation());
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    file = fs.makeQualified(file);
+    LOG.info("Reading input split file from : " + file);
+    long offset = splitMetaInfo.getStartOffset();
+    
+    FSDataInputStream inFile = fs.open(file);
+    inFile.seek(offset);
+    String className = Text.readString(inFile);
+    Class<org.apache.hadoop.mapred.InputSplit> cls;
+    try {
+      cls = 
+          (Class<org.apache.hadoop.mapred.InputSplit>) 
+          jobConf.getClassByName(className);
+    } catch (ClassNotFoundException ce) {
+      IOException wrap = new IOException("Split class " + className + 
+          " not found");
+      wrap.initCause(ce);
+      throw wrap;
+    }
+    SerializationFactory factory = new SerializationFactory(jobConf);
+    Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = 
+        (Deserializer<org.apache.hadoop.mapred.InputSplit>) 
+        factory.getDeserializer(cls);
+    deserializer.open(inFile);
+    org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
+    long pos = inFile.getPos();
+    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+        .increment(pos - offset);
+    inFile.close();
+    return split;
+  }
+
+  @SuppressWarnings("unchecked")
+  private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetails(
+      TaskSplitIndex splitMetaInfo) throws IOException {
+    Path file = new Path(splitMetaInfo.getSplitLocation());
+    long offset = splitMetaInfo.getStartOffset();
+    
+    // Split information read from local filesystem.
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    file = fs.makeQualified(file);
+    LOG.info("Reading input split file from : " + file);
+    FSDataInputStream inFile = fs.open(file);
+    inFile.seek(offset);
+    String className = Text.readString(inFile);
+    Class<org.apache.hadoop.mapreduce.InputSplit> cls;
+    try {
+      cls = 
+          (Class<org.apache.hadoop.mapreduce.InputSplit>) 
+          jobConf.getClassByName(className);
+    } catch (ClassNotFoundException ce) {
+      IOException wrap = new IOException("Split class " + className + 
+          " not found");
+      wrap.initCause(ce);
+      throw wrap;
+    }
+    SerializationFactory factory = new SerializationFactory(jobConf);
+    Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = 
+        (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) 
+        factory.getDeserializer(cls);
+    deserializer.open(inFile);
+    org.apache.hadoop.mapreduce.InputSplit split = 
+        deserializer.deserialize(null);
+    long pos = inFile.getPos();
+    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+        .increment(pos - offset);
+    inFile.close();
+    return split;
+  }
+
+  private void setIncrementalConfigParams(InputSplit inputSplit) {
+    if (inputSplit instanceof FileSplit) {
+      FileSplit fileSplit = (FileSplit) inputSplit;
+      this.incrementalConf = new Configuration(false);
+
+      this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath()
+          .toString());
+      this.incrementalConf.setLong(JobContext.MAP_INPUT_START,
+          fileSplit.getStart());
+      this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH,
+          fileSplit.getLength());
+    }
+    LOG.info("Processing split: " + inputSplit);
+  }
+
+  private long getInputBytes() {
+    if (fsStats == null) return 0;
+    long bytesRead = 0;
+    for (Statistics stat: fsStats) {
+      bytesRead = bytesRead + stat.getBytesRead();
+    }
+    return bytesRead;
+  }
+
+  protected TaskSplitMetaInfo[] readSplits(Configuration conf)
+      throws IOException {
+    TaskSplitMetaInfo[] allTaskSplitMetaInfo;
+    allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(conf,
+        FileSystem.getLocal(conf));
+    return allTaskSplitMetaInfo;
+  }
+}