You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/05/05 07:32:25 UTC

tez git commit: TEZ-2392. Have all readers throw an Exception on incorrect next() usage (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 210619a56 -> b0054628d


TEZ-2392. Have all readers throw an Exception on incorrect next() usage (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b0054628
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b0054628
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b0054628

Branch: refs/heads/master
Commit: b0054628df8d6d01cfed9bf850759ebc39c1e3b7
Parents: 210619a
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue May 5 11:02:07 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue May 5 11:02:07 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/mapreduce/input/MRInput.java |   3 +-
 .../tez/mapreduce/lib/MRReaderMapReduce.java    |   5 +-
 .../tez/mapreduce/lib/MRReaderMapred.java       |   3 +
 .../tez/mapreduce/input/TestMultiMRInput.java   |  13 ++
 .../tez/mapreduce/lib/TestKVReadersWithMR.java  | 178 +++++++++++++++++++
 .../tez/runtime/library/api/KeyValueReader.java |  17 ++
 .../runtime/library/api/KeyValuesReader.java    |  17 ++
 .../runtime/library/common/ValuesIterator.java  |  18 ++
 .../common/readers/UnorderedKVReader.java       |  10 +-
 .../input/ConcatenatedMergedKeyValueInput.java  |   4 +-
 .../input/ConcatenatedMergedKeyValuesInput.java |   4 +-
 .../library/input/OrderedGroupedKVInput.java    |   2 +
 .../input/OrderedGroupedMergedKVInput.java      |   3 +
 .../runtime/library/input/UnorderedKVInput.java |   2 +
 .../library/common/TestValuesIterator.java      |  21 ++-
 .../common/readers/TestUnorderedKVReader.java   | 168 +++++++++++++++++
 .../input/TestSortedGroupedMergedInput.java     | 143 ++++++++++++++-
 18 files changed, 595 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ff7601..816c7a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2392. Have all readers throw an Exception on incorrect next() usage.
   TEZ-2408. TestTaskAttempt fails to compile against hadoop-2.4 and hadoop-2.2.
   TEZ-2405. PipelinedSorter can throw NPE with custom compartor.
   TEZ-1897. Create a concurrent version of AsyncDispatcher

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 991f6d1..270f68f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -493,7 +493,8 @@ public class MRInput extends MRInputBase {
 
   /**
    * Returns a {@link KeyValueReader} that can be used to read 
-   * Map Reduce compatible key value data
+   * Map Reduce compatible key value data. An exception will be thrown if next()
+   * is invoked after false, either from the framework or from the underlying InputFormat
    */
   @Override
   public KeyValueReader getReader() throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
index 39cd79c..0495751 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
@@ -44,7 +44,7 @@ public class MRReaderMapReduce extends MRReader {
   @SuppressWarnings("rawtypes")
   private final InputFormat inputFormat;
   @SuppressWarnings("rawtypes")
-  private RecordReader recordReader;
+  protected RecordReader recordReader;
   private InputSplit inputSplit;
 
   private boolean setupComplete = false;
@@ -120,6 +120,9 @@ public class MRReaderMapReduce extends MRReader {
     }
     if (hasNext) {
       inputRecordCounter.increment(1);
+    } else {
+      hasCompletedProcessing();
+      completedProcessing = true;
     }
     return hasNext;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
index c4ad7a4..366e7a7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
@@ -113,6 +113,9 @@ public class MRReaderMapred extends MRReader {
     boolean hasNext = recordReader.next(key, value);
     if (hasNext) {
       inputRecordCounter.increment(1);
+    } else {
+      hasCompletedProcessing();
+      completedProcessing = true;
     }
     return hasNext;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index 55f6bff..4031140 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -131,6 +131,12 @@ public class TestMultiMRInput {
         Object val = reader.getCurrentValue();
         assertEquals(val, data1.remove(key));
       }
+      try {
+        boolean hasNext = reader.next(); //should throw exception
+        fail();
+      } catch(IOException e) {
+        assertTrue(e.getMessage().contains("For usage, please refer to"));
+      }
     }
     assertEquals(1, readerCount);
   }
@@ -198,6 +204,13 @@ public class TestMultiMRInput {
         Object val = reader.getCurrentValue();
         assertEquals(val, data.remove(key));
       }
+
+      try {
+        boolean hasNext = reader.next(); //should throw exception
+        fail();
+      } catch(IOException e) {
+        assertTrue(e.getMessage().contains("For usage, please refer to"));
+      }
     }
     assertEquals(2, readerCount);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
new file mode 100644
index 0000000..65f5ad0
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.lib;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestKVReadersWithMR {
+
+  private JobConf conf;
+  private TezCounters counters;
+  private TezCounter inputRecordCounter;
+
+  @Before
+  public void setup() {
+    conf = new JobConf();
+    counters = new TezCounters();
+    inputRecordCounter = counters.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
+  }
+
+  @Test(timeout = 10000)
+  public void testMRReaderMapred() throws IOException {
+    //empty
+    testWithSpecificNumberOfKV(0);
+
+    testWithSpecificNumberOfKV(10);
+
+    //empty
+    testWithSpecificNumberOfKV_MapReduce(0);
+
+    testWithSpecificNumberOfKV_MapReduce(10);
+  }
+
+  public void testWithSpecificNumberOfKV(int kvPairs) throws IOException {
+    MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter);
+
+    reader.recordReader = new DummyRecordReader(kvPairs);
+    int records = 0;
+    while (reader.next()) {
+      records++;
+    }
+    assertTrue(kvPairs == records);
+
+    //reading again should fail
+    try {
+      boolean hasNext = reader.next();
+      fail();
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("For usage, please refer to"));
+    }
+
+  }
+
+  public void testWithSpecificNumberOfKV_MapReduce(int kvPairs) throws IOException {
+    MRReaderMapReduce reader = new MRReaderMapReduce(conf, counters, inputRecordCounter, -1, 1,
+        10, 20, 30);
+
+    reader.recordReader = new DummyRecordReaderMapReduce(kvPairs);
+    int records = 0;
+    while (reader.next()) {
+      records++;
+    }
+    assertTrue(kvPairs == records);
+
+    //reading again should fail
+    try {
+      boolean hasNext = reader.next();
+      fail();
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("For usage, please refer to"));
+    }
+  }
+
+  static class DummyRecordReader implements RecordReader {
+    int records;
+
+    public DummyRecordReader(int records) {
+      this.records = records;
+    }
+
+    @Override
+    public boolean next(Object o, Object o2) throws IOException {
+      return (records-- > 0);
+    }
+
+    @Override
+    public Object createKey() {
+      return null;
+    }
+
+    @Override
+    public Object createValue() {
+      return null;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return 0;
+    }
+  }
+
+  static class DummyRecordReaderMapReduce extends org.apache.hadoop.mapreduce.RecordReader {
+    int records;
+
+    public DummyRecordReaderMapReduce(int records) {
+      this.records = records;
+    }
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+        throws IOException, InterruptedException {
+
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return (records-- > 0);
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException, InterruptedException {
+      return null;
+    }
+
+    @Override
+    public Object getCurrentValue() throws IOException, InterruptedException {
+      return null;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
index 67b6f85..d504d08 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
@@ -34,11 +34,15 @@ import org.apache.tez.runtime.api.Reader;
  *   Object value = kvReader.getCurrentValue();
  * </code>
  *
+ * if next() is called after processing everything,
+ * IOException would be thrown
  */
 @Public
 @Evolving
 public abstract class KeyValueReader extends Reader {
 
+  protected boolean completedProcessing;
+
   /**
    * Moves to the next key/values(s) pair
    * 
@@ -62,4 +66,17 @@ public abstract class KeyValueReader extends Reader {
    * @throws IOException
    */
   public abstract Object getCurrentValue() throws IOException;
+
+  /**
+   * Check whether processing has been completed.
+   *
+   * @throws IOException
+   */
+  protected void hasCompletedProcessing() throws IOException {
+    if (completedProcessing) {
+      throw new IOException("Please check if you are"
+          + " invoking next() even after it returned false. For usage, please refer to "
+          + "KeyValueReader javadocs");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
index 0bb2777..510f4b7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
@@ -34,11 +34,15 @@ import org.apache.tez.runtime.api.Reader;
  *   Iterable<Object> values = kvReader.getCurrentValues();
  * </code>
  *
+ * if next() is called after processing everything,
+ * IOException would be thrown
  */
 @Public
 @Evolving
 public abstract class KeyValuesReader extends Reader {
 
+  protected boolean completedProcessing;
+
   /**
    * Moves to the next key/values(s) pair
    * 
@@ -60,4 +64,17 @@ public abstract class KeyValuesReader extends Reader {
    * @return an Iterable view of the values associated with the current key
    */
   public abstract Iterable<Object> getCurrentValues() throws IOException;
+
+  /**
+   * Check whether processing has been completed.
+   *
+   * @throws IOException
+   */
+  protected void hasCompletedProcessing() throws IOException {
+    if (completedProcessing) {
+      throw new IOException("Please check if you are"
+          + " invoking next() even after it returned false. For usage, please refer to "
+          + "KeyValuesReader javadocs");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
index a1f52e7..24f9f8a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -63,6 +63,8 @@ public class ValuesIterator<KEY,VALUE> {
   private int keyCtr = 0;
   private boolean hasMoreValues; // For the current key.
   private boolean isFirstRecord = true;
+
+  private boolean completedProcessing;
   
   public ValuesIterator (TezRawKeyValueIterator in, 
                          RawComparator<KEY> comparator, 
@@ -99,6 +101,10 @@ public class ValuesIterator<KEY,VALUE> {
     } else {
       nextKey();
     }
+    if (!more) {
+      hasCompletedProcessing();
+      completedProcessing = true;
+    }
     return more;
   }
 
@@ -206,4 +212,16 @@ public class ValuesIterator<KEY,VALUE> {
         nextValueBytes.getLength() - nextValueBytes.getPosition());
     value = valDeserializer.deserialize(value);
   }
+
+  /**
+   * Check whether processing has been completed.
+   *
+   * @throws IOException
+   */
+  protected void hasCompletedProcessing() throws IOException {
+    if (completedProcessing) {
+      throw new IOException("Please check if you are invoking moveToNext() even after it returned"
+          + " false.");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index 46af66d..b14a461 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -71,8 +71,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
   // the counter at the moment will generate aggregate numbers. 
   private int numRecordsRead = 0;
 
-  private boolean completedProcessing;
-  
+
   public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
       CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
       TezCounter inputRecordCounter)
@@ -131,13 +130,6 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
     }
   }
 
-  private void hasCompletedProcessing() throws IOException {
-    if (completedProcessing) {
-      throw new IOException("Reader has already processed all the inputs. Please check if you are"
-          + " invoking next() even after it returned false. For usage, please refer to "
-          + "KeyValueReader javadocs");
-    }
-  }
 
   @Override
   public Object getCurrentKey() throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 39e0fff..14b1e2c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -46,11 +46,13 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
   public class ConcatenatedMergedKeyValueReader extends KeyValueReader {
     private int currentReaderIndex = 0;
     private KeyValueReader currentReader;
-    
+
     @Override
     public boolean next() throws IOException {
       while ((currentReader == null) || !currentReader.next()) {
         if (currentReaderIndex == getInputs().size()) {
+          hasCompletedProcessing();
+          completedProcessing = true;
           return false;
         }
         try {

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 0cc3244..2a1e4c6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -47,11 +47,13 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
   public class ConcatenatedMergedKeyValuesReader extends KeyValuesReader {
     private int currentReaderIndex = 0;
     private KeyValuesReader currentReader;
-    
+
     @Override
     public boolean next() throws IOException {
       while ((currentReader == null) || !currentReader.next()) {
         if (currentReaderIndex == getInputs().size()) {
+          hasCompletedProcessing();
+          completedProcessing = true;
           return false;
         }
         try {

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index e61dbdc..d784fcd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -218,6 +218,8 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
         return new KeyValuesReader() {
           @Override
           public boolean next() throws IOException {
+            hasCompletedProcessing();
+            completedProcessing = true;
             return false;
           }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
index 9adac54..41ca7c9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
@@ -126,6 +126,9 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
         currentKey = nextKVReader.getCurrentKey();
         currentValues.moveToNext();
         return true;
+      } else {
+        hasCompletedProcessing();
+        completedProcessing = true;
       }
       return false;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index ce27103..62fa9a5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -164,6 +164,8 @@ public class UnorderedKVInput extends AbstractLogicalInput {
       return new KeyValueReader() {
         @Override
         public boolean next() throws IOException {
+          hasCompletedProcessing();
+          completedProcessing = true;
           return false;
         }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index e1718c8..edb9b15 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -65,6 +65,7 @@ import java.util.TreeMap;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
@@ -208,10 +209,19 @@ public class TestValuesIterator {
   @Test(timeout = 20000)
   public void testIteratorWithIFileReaderEmptyPartitions() throws IOException {
     ValuesIterator iterator = createEmptyIterator(false);
-    assert(iterator.moveToNext() == false);
+    assertTrue(iterator.moveToNext() == false);
 
     iterator = createEmptyIterator(true);
-    assert(iterator.moveToNext() == false);
+    assertTrue(iterator.moveToNext() == false);
+  }
+
+  private void getNextFromFinishedIterator(ValuesIterator iterator) {
+    try {
+      boolean hasNext = iterator.moveToNext();
+      fail();
+    } catch(IOException e) {
+      assertTrue(e.getMessage().contains("Please check if you are invoking moveToNext()"));
+    }
   }
 
   private ValuesIterator createEmptyIterator(boolean inMemory) throws IOException {
@@ -292,7 +302,14 @@ public class TestValuesIterator {
     }
     if (expectedTestResult) {
       assertTrue(result);
+
+      assertFalse(valuesIterator.moveToNext());
+      getNextFromFinishedIterator(valuesIterator);
     } else {
+      while(valuesIterator.moveToNext()) {
+        //iterate through all keys
+      }
+      getNextFromFinishedIterator(valuesIterator);
       assertFalse(result);
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
new file mode 100644
index 0000000..51ea42d
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.readers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback;
+import org.apache.tez.runtime.library.common.shuffle.LocalDiskFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import static junit.framework.TestCase.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+public class TestUnorderedKVReader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestUnorderedKVReader.class);
+
+  private static Configuration defaultConf = new Configuration();
+  private static FileSystem localFs = null;
+  private static Path workDir = null;
+
+  private String outputFileName = "ifile.out";
+  private Path outputPath;
+  private long rawLen;
+  private long compLen;
+
+  private UnorderedKVReader<Text, Text> unorderedKVReader;
+
+  static {
+    defaultConf.set("fs.defaultFS", "file:///");
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+      workDir = new Path(
+          new Path(System.getProperty("test.build.data", "/tmp")),
+          TestUnorderedKVReader.class.getName())
+          .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+      LOG.info("Using workDir: " + workDir);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    outputPath = new Path(workDir, outputFileName);
+    setupReader();
+  }
+
+  private void setupReader() throws IOException, InterruptedException {
+    defaultConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    defaultConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+
+    createIFile(outputPath, 1);
+
+    final LinkedList<LocalDiskFetchedInput> inputs = new LinkedList<LocalDiskFetchedInput>();
+    LocalDiskFetchedInput realFetchedInput = new LocalDiskFetchedInput(0, rawLen, compLen, new
+        InputAttemptIdentifier(0, 0), outputPath, defaultConf, new FetchedInputCallback() {
+      @Override
+      public void fetchComplete(FetchedInput fetchedInput) {
+      }
+
+      @Override
+      public void fetchFailed(FetchedInput fetchedInput) {
+      }
+
+      @Override
+      public void freeResources(FetchedInput fetchedInput) {
+      }
+    });
+    LocalDiskFetchedInput fetchedInput = spy(realFetchedInput);
+    doNothing().when(fetchedInput).free();
+
+    inputs.add(fetchedInput);
+
+    TezCounters counters = new TezCounters();
+    TezCounter inputRecords = counters.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
+
+    ShuffleManager manager = mock(ShuffleManager.class);
+    doAnswer(new Answer() {
+      @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+        return (inputs.isEmpty()) ? null : inputs.remove();
+      }
+    }).when(manager).getNextInput();
+
+    unorderedKVReader = new UnorderedKVReader<Text, Text>(manager,
+        defaultConf, null, false, -1, -1, inputRecords);
+  }
+
+  private void createIFile(Path path, int recordCount) throws IOException {
+    FSDataOutputStream out = localFs.create(path);
+    IFile.Writer writer =
+        new IFile.Writer(defaultConf, out, Text.class, Text.class, null, null, null, true);
+
+    for (int i = 0; i < recordCount; i++) {
+      writer.append(new Text("Key_" + i), new Text("Value_" + i));
+    }
+    writer.close();
+    rawLen = writer.getRawLength();
+    compLen = writer.getCompressedLength();
+    out.close();
+  }
+
+  @Before
+  @After
+  public void cleanup() throws Exception {
+    localFs.delete(workDir, true);
+  }
+
+  @Test(timeout = 5000)
+  public void testReadingMultipleTimes() throws Exception {
+    int counter = 0;
+    while (unorderedKVReader.next()) {
+      unorderedKVReader.getCurrentKey();
+      unorderedKVReader.getCurrentKey();
+      counter++;
+    }
+    Assert.assertEquals(1, counter);
+
+    //Check the reader again. This shouldn't throw EOF exception in IFile
+    try {
+      boolean next = unorderedKVReader.next();
+      fail();
+    } catch(IOException ioe) {
+      Assert.assertTrue(ioe.getMessage().contains("For usage, please refer to"));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
index 570deb7..0de400e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
@@ -19,7 +19,7 @@
 package org.apache.tez.runtime.library.input;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
@@ -35,7 +35,9 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.MergedInputContext;
+import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl;
+import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.junit.Test;
 
@@ -82,6 +84,18 @@ public class TestSortedGroupedMergedInput {
       }
       assertEquals(6, valCount);
     }
+
+    getNextFromFinishedReader(kvsReader);
+  }
+
+  private void getNextFromFinishedReader(KeyValuesReader kvsReader) {
+    //Try reading again and it should throw IOException
+    try {
+      boolean hasNext = kvsReader.next();
+      fail();
+    } catch(IOException e) {
+      assertTrue(e.getMessage().contains("For usage, please refer to"));
+    }
   }
 
   @Test(timeout = 5000)
@@ -126,6 +140,7 @@ public class TestSortedGroupedMergedInput {
       }
       assertEquals(6, valCount);
     }
+    getNextFromFinishedReader(kvsReader);
   }
 
   @Test(timeout = 5000)
@@ -172,6 +187,7 @@ public class TestSortedGroupedMergedInput {
         assertEquals(6, valCount);
       }
     }
+    getNextFromFinishedReader(kvsReader);
   }
 
   @Test(timeout = 5000)
@@ -223,6 +239,7 @@ public class TestSortedGroupedMergedInput {
         fail("Unexpected key");
       }
     }
+    getNextFromFinishedReader(kvsReader);
   }
 
   @Test(timeout = 5000)
@@ -277,6 +294,7 @@ public class TestSortedGroupedMergedInput {
         fail("Unexpected key");
       }
     }
+    getNextFromFinishedReader(kvsReader);
   }
   
   // Reads all values for a key, but doesn't trigger the last hasNext() call.
@@ -324,6 +342,7 @@ public class TestSortedGroupedMergedInput {
       }
       assertEquals(6, valCount);
     }
+    getNextFromFinishedReader(kvsReader);
   }
 
   @Test(timeout = 5000)
@@ -350,7 +369,84 @@ public class TestSortedGroupedMergedInput {
     OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(createMergedInputContext(), sInputs);
 
     KeyValuesReader kvsReader = input.getReader();
-    assertFalse(kvsReader.next());
+    assertTrue(kvsReader.next() == false);
+    getNextFromFinishedReader(kvsReader);
+  }
+
+  @Test(timeout = 5000)
+  public void testSimpleConcatenatedMergedKeyValueInput() throws Exception {
+
+    DummyInput sInput1 = new DummyInput(10);
+    DummyInput sInput2 = new DummyInput(10);
+    DummyInput sInput3 = new DummyInput(10);
+
+    List<Input> sInputs = new LinkedList<Input>();
+    sInputs.add(sInput1);
+    sInputs.add(sInput2);
+    sInputs.add(sInput3);
+    ConcatenatedMergedKeyValueInput input =
+        new ConcatenatedMergedKeyValueInput(createMergedInputContext(), sInputs);
+
+    KeyValueReader kvReader = input.getReader();
+    int keyCount = 0;
+    while (kvReader.next()) {
+      keyCount++;
+      Integer key = (Integer) kvReader.getCurrentKey();
+      Integer value = (Integer) kvReader.getCurrentValue();
+    }
+    assertTrue(keyCount == 30);
+
+    getNextFromFinishedReader(kvReader);
+  }
+
+  @Test(timeout = 5000)
+  public void testSimpleConcatenatedMergedKeyValuesInput() throws Exception {
+    SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 },
+        new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
+
+    SortedTestKeyValuesReader kvsReader2 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 },
+        new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
+
+    SortedTestKeyValuesReader kvsReader3 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 },
+        new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
+
+    SortedTestInput sInput1 = new SortedTestInput(kvsReader1);
+    SortedTestInput sInput2 = new SortedTestInput(kvsReader2);
+    SortedTestInput sInput3 = new SortedTestInput(kvsReader3);
+
+    List<Input> sInputs = new LinkedList<Input>();
+    sInputs.add(sInput1);
+    sInputs.add(sInput2);
+    sInputs.add(sInput3);
+    ConcatenatedMergedKeyValuesInput input =
+        new ConcatenatedMergedKeyValuesInput(createMergedInputContext(), sInputs);
+
+    KeyValuesReader kvsReader = input.getReader();
+    int keyCount = 0;
+    while (kvsReader.next()) {
+      keyCount++;
+      Integer key = (Integer) kvsReader.getCurrentKey();
+      Iterator<Object> valuesIter = kvsReader.getCurrentValues().iterator();
+      int valCount = 0;
+      while (valuesIter.hasNext()) {
+        valCount++;
+        Integer val = (Integer) valuesIter.next();
+      }
+      assertEquals(2, valCount);
+    }
+    assertEquals(9, keyCount);
+
+    getNextFromFinishedReader(kvsReader);
+  }
+
+  private void getNextFromFinishedReader(KeyValueReader kvReader) {
+    //Try reading again and it should throw IOException
+    try {
+      boolean hasNext = kvReader.next();
+      fail();
+    } catch(IOException e) {
+      assertTrue(e.getMessage().contains("For usage, please refer to"));
+    }
   }
 
   private static class SortedTestInput extends OrderedGroupedKVInput {
@@ -404,8 +500,10 @@ public class TestSortedGroupedMergedInput {
 
     @Override
     public boolean next() throws IOException {
+      hasCompletedProcessing();
       currentIndex++;
       if (keys == null || currentIndex >= keys.length) {
+        completedProcessing = true;
         return false;
       }
       return true;
@@ -426,6 +524,47 @@ public class TestSortedGroupedMergedInput {
     }
   }
 
+  private static class DummyInput implements Input {
+    DummyKeyValueReader reader;
+
+    public DummyInput(int records) {
+      reader = new DummyKeyValueReader(records);
+    }
+
+    @Override
+    public void start() throws Exception {
+    }
+
+    @Override
+    public Reader getReader() throws Exception {
+      return reader;
+    }
+  }
+
+  private static class DummyKeyValueReader extends KeyValueReader {
+    private int records;
+
+    public DummyKeyValueReader(int records) {
+      this.records = records;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      return (records-- > 0);
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return records;
+    }
+
+    @Override
+    public Object getCurrentValue() throws IOException {
+      return records;
+    }
+  }
+
+
   private static class RawComparatorForTest implements RawComparator<Integer> {
 
     @Override