You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:20 UTC
[18/50] [abbrv] tez git commit: TEZ-2392. Have all readers throw an
Exception on incorrect next() usage (rbalamohan)
TEZ-2392. Have all readers throw an Exception on incorrect next() usage (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b0054628
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b0054628
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b0054628
Branch: refs/heads/TEZ-2003
Commit: b0054628df8d6d01cfed9bf850759ebc39c1e3b7
Parents: 210619a
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Tue May 5 11:02:07 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Tue May 5 11:02:07 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/mapreduce/input/MRInput.java | 3 +-
.../tez/mapreduce/lib/MRReaderMapReduce.java | 5 +-
.../tez/mapreduce/lib/MRReaderMapred.java | 3 +
.../tez/mapreduce/input/TestMultiMRInput.java | 13 ++
.../tez/mapreduce/lib/TestKVReadersWithMR.java | 178 +++++++++++++++++++
.../tez/runtime/library/api/KeyValueReader.java | 17 ++
.../runtime/library/api/KeyValuesReader.java | 17 ++
.../runtime/library/common/ValuesIterator.java | 18 ++
.../common/readers/UnorderedKVReader.java | 10 +-
.../input/ConcatenatedMergedKeyValueInput.java | 4 +-
.../input/ConcatenatedMergedKeyValuesInput.java | 4 +-
.../library/input/OrderedGroupedKVInput.java | 2 +
.../input/OrderedGroupedMergedKVInput.java | 3 +
.../runtime/library/input/UnorderedKVInput.java | 2 +
.../library/common/TestValuesIterator.java | 21 ++-
.../common/readers/TestUnorderedKVReader.java | 168 +++++++++++++++++
.../input/TestSortedGroupedMergedInput.java | 143 ++++++++++++++-
18 files changed, 595 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ff7601..816c7a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
ALL CHANGES:
+ TEZ-2392. Have all readers throw an Exception on incorrect next() usage.
TEZ-2408. TestTaskAttempt fails to compile against hadoop-2.4 and hadoop-2.2.
TEZ-2405. PipelinedSorter can throw NPE with custom compartor.
TEZ-1897. Create a concurrent version of AsyncDispatcher
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 991f6d1..270f68f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -493,7 +493,8 @@ public class MRInput extends MRInputBase {
/**
* Returns a {@link KeyValueReader} that can be used to read
- * Map Reduce compatible key value data
+ * Map Reduce compatible key value data. An exception will be thrown if next()
+ * is invoked after false, either from the framework or from the underlying InputFormat
*/
@Override
public KeyValueReader getReader() throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
index 39cd79c..0495751 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
@@ -44,7 +44,7 @@ public class MRReaderMapReduce extends MRReader {
@SuppressWarnings("rawtypes")
private final InputFormat inputFormat;
@SuppressWarnings("rawtypes")
- private RecordReader recordReader;
+ protected RecordReader recordReader;
private InputSplit inputSplit;
private boolean setupComplete = false;
@@ -120,6 +120,9 @@ public class MRReaderMapReduce extends MRReader {
}
if (hasNext) {
inputRecordCounter.increment(1);
+ } else {
+ hasCompletedProcessing();
+ completedProcessing = true;
}
return hasNext;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
index c4ad7a4..366e7a7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
@@ -113,6 +113,9 @@ public class MRReaderMapred extends MRReader {
boolean hasNext = recordReader.next(key, value);
if (hasNext) {
inputRecordCounter.increment(1);
+ } else {
+ hasCompletedProcessing();
+ completedProcessing = true;
}
return hasNext;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index 55f6bff..4031140 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -131,6 +131,12 @@ public class TestMultiMRInput {
Object val = reader.getCurrentValue();
assertEquals(val, data1.remove(key));
}
+ try {
+ boolean hasNext = reader.next(); //should throw exception
+ fail();
+ } catch(IOException e) {
+ assertTrue(e.getMessage().contains("For usage, please refer to"));
+ }
}
assertEquals(1, readerCount);
}
@@ -198,6 +204,13 @@ public class TestMultiMRInput {
Object val = reader.getCurrentValue();
assertEquals(val, data.remove(key));
}
+
+ try {
+ boolean hasNext = reader.next(); //should throw exception
+ fail();
+ } catch(IOException e) {
+ assertTrue(e.getMessage().contains("For usage, please refer to"));
+ }
}
assertEquals(2, readerCount);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
new file mode 100644
index 0000000..65f5ad0
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.lib;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestKVReadersWithMR {
+
+ private JobConf conf;
+ private TezCounters counters;
+ private TezCounter inputRecordCounter;
+
+ @Before
+ public void setup() {
+ conf = new JobConf();
+ counters = new TezCounters();
+ inputRecordCounter = counters.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
+ }
+
+ @Test(timeout = 10000)
+ public void testMRReaderMapred() throws IOException {
+ //empty
+ testWithSpecificNumberOfKV(0);
+
+ testWithSpecificNumberOfKV(10);
+
+ //empty
+ testWithSpecificNumberOfKV_MapReduce(0);
+
+ testWithSpecificNumberOfKV_MapReduce(10);
+ }
+
+ public void testWithSpecificNumberOfKV(int kvPairs) throws IOException {
+ MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter);
+
+ reader.recordReader = new DummyRecordReader(kvPairs);
+ int records = 0;
+ while (reader.next()) {
+ records++;
+ }
+ assertTrue(kvPairs == records);
+
+ //reading again should fail
+ try {
+ boolean hasNext = reader.next();
+ fail();
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("For usage, please refer to"));
+ }
+
+ }
+
+ public void testWithSpecificNumberOfKV_MapReduce(int kvPairs) throws IOException {
+ MRReaderMapReduce reader = new MRReaderMapReduce(conf, counters, inputRecordCounter, -1, 1,
+ 10, 20, 30);
+
+ reader.recordReader = new DummyRecordReaderMapReduce(kvPairs);
+ int records = 0;
+ while (reader.next()) {
+ records++;
+ }
+ assertTrue(kvPairs == records);
+
+ //reading again should fail
+ try {
+ boolean hasNext = reader.next();
+ fail();
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("For usage, please refer to"));
+ }
+ }
+
+ static class DummyRecordReader implements RecordReader {
+ int records;
+
+ public DummyRecordReader(int records) {
+ this.records = records;
+ }
+
+ @Override
+ public boolean next(Object o, Object o2) throws IOException {
+ return (records-- > 0);
+ }
+
+ @Override
+ public Object createKey() {
+ return null;
+ }
+
+ @Override
+ public Object createValue() {
+ return null;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return 0;
+ }
+ }
+
+ static class DummyRecordReaderMapReduce extends org.apache.hadoop.mapreduce.RecordReader {
+ int records;
+
+ public DummyRecordReaderMapReduce(int records) {
+ this.records = records;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return (records-- > 0);
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
index 67b6f85..d504d08 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
@@ -34,11 +34,15 @@ import org.apache.tez.runtime.api.Reader;
* Object value = kvReader.getCurrentValue();
* </code>
*
+ * if next() is called after processing everything,
+ * IOException would be thrown
*/
@Public
@Evolving
public abstract class KeyValueReader extends Reader {
+ protected boolean completedProcessing;
+
/**
* Moves to the next key/values(s) pair
*
@@ -62,4 +66,17 @@ public abstract class KeyValueReader extends Reader {
* @throws IOException
*/
public abstract Object getCurrentValue() throws IOException;
+
+ /**
+ * Check whether processing has been completed.
+ *
+ * @throws IOException
+ */
+ protected void hasCompletedProcessing() throws IOException {
+ if (completedProcessing) {
+ throw new IOException("Please check if you are"
+ + " invoking next() even after it returned false. For usage, please refer to "
+ + "KeyValueReader javadocs");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
index 0bb2777..510f4b7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
@@ -34,11 +34,15 @@ import org.apache.tez.runtime.api.Reader;
* Iterable<Object> values = kvReader.getCurrentValues();
* </code>
*
+ * if next() is called after processing everything,
+ * IOException would be thrown
*/
@Public
@Evolving
public abstract class KeyValuesReader extends Reader {
+ protected boolean completedProcessing;
+
/**
* Moves to the next key/values(s) pair
*
@@ -60,4 +64,17 @@ public abstract class KeyValuesReader extends Reader {
* @return an Iterable view of the values associated with the current key
*/
public abstract Iterable<Object> getCurrentValues() throws IOException;
+
+ /**
+ * Check whether processing has been completed.
+ *
+ * @throws IOException
+ */
+ protected void hasCompletedProcessing() throws IOException {
+ if (completedProcessing) {
+ throw new IOException("Please check if you are"
+ + " invoking next() even after it returned false. For usage, please refer to "
+ + "KeyValuesReader javadocs");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
index a1f52e7..24f9f8a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -63,6 +63,8 @@ public class ValuesIterator<KEY,VALUE> {
private int keyCtr = 0;
private boolean hasMoreValues; // For the current key.
private boolean isFirstRecord = true;
+
+ private boolean completedProcessing;
public ValuesIterator (TezRawKeyValueIterator in,
RawComparator<KEY> comparator,
@@ -99,6 +101,10 @@ public class ValuesIterator<KEY,VALUE> {
} else {
nextKey();
}
+ if (!more) {
+ hasCompletedProcessing();
+ completedProcessing = true;
+ }
return more;
}
@@ -206,4 +212,16 @@ public class ValuesIterator<KEY,VALUE> {
nextValueBytes.getLength() - nextValueBytes.getPosition());
value = valDeserializer.deserialize(value);
}
+
+ /**
+ * Check whether processing has been completed.
+ *
+ * @throws IOException
+ */
+ protected void hasCompletedProcessing() throws IOException {
+ if (completedProcessing) {
+ throw new IOException("Please check if you are invoking moveToNext() even after it returned"
+ + " false.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index 46af66d..b14a461 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -71,8 +71,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
// the counter at the moment will generate aggregate numbers.
private int numRecordsRead = 0;
- private boolean completedProcessing;
-
+
public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
TezCounter inputRecordCounter)
@@ -131,13 +130,6 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
}
}
- private void hasCompletedProcessing() throws IOException {
- if (completedProcessing) {
- throw new IOException("Reader has already processed all the inputs. Please check if you are"
- + " invoking next() even after it returned false. For usage, please refer to "
- + "KeyValueReader javadocs");
- }
- }
@Override
public Object getCurrentKey() throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 39e0fff..14b1e2c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -46,11 +46,13 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
public class ConcatenatedMergedKeyValueReader extends KeyValueReader {
private int currentReaderIndex = 0;
private KeyValueReader currentReader;
-
+
@Override
public boolean next() throws IOException {
while ((currentReader == null) || !currentReader.next()) {
if (currentReaderIndex == getInputs().size()) {
+ hasCompletedProcessing();
+ completedProcessing = true;
return false;
}
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 0cc3244..2a1e4c6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -47,11 +47,13 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
public class ConcatenatedMergedKeyValuesReader extends KeyValuesReader {
private int currentReaderIndex = 0;
private KeyValuesReader currentReader;
-
+
@Override
public boolean next() throws IOException {
while ((currentReader == null) || !currentReader.next()) {
if (currentReaderIndex == getInputs().size()) {
+ hasCompletedProcessing();
+ completedProcessing = true;
return false;
}
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index e61dbdc..d784fcd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -218,6 +218,8 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
return new KeyValuesReader() {
@Override
public boolean next() throws IOException {
+ hasCompletedProcessing();
+ completedProcessing = true;
return false;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
index 9adac54..41ca7c9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
@@ -126,6 +126,9 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
currentKey = nextKVReader.getCurrentKey();
currentValues.moveToNext();
return true;
+ } else {
+ hasCompletedProcessing();
+ completedProcessing = true;
}
return false;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index ce27103..62fa9a5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -164,6 +164,8 @@ public class UnorderedKVInput extends AbstractLogicalInput {
return new KeyValueReader() {
@Override
public boolean next() throws IOException {
+ hasCompletedProcessing();
+ completedProcessing = true;
return false;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index e1718c8..edb9b15 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -65,6 +65,7 @@ import java.util.TreeMap;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -208,10 +209,19 @@ public class TestValuesIterator {
@Test(timeout = 20000)
public void testIteratorWithIFileReaderEmptyPartitions() throws IOException {
ValuesIterator iterator = createEmptyIterator(false);
- assert(iterator.moveToNext() == false);
+ assertTrue(iterator.moveToNext() == false);
iterator = createEmptyIterator(true);
- assert(iterator.moveToNext() == false);
+ assertTrue(iterator.moveToNext() == false);
+ }
+
+ private void getNextFromFinishedIterator(ValuesIterator iterator) {
+ try {
+ boolean hasNext = iterator.moveToNext();
+ fail();
+ } catch(IOException e) {
+ assertTrue(e.getMessage().contains("Please check if you are invoking moveToNext()"));
+ }
}
private ValuesIterator createEmptyIterator(boolean inMemory) throws IOException {
@@ -292,7 +302,14 @@ public class TestValuesIterator {
}
if (expectedTestResult) {
assertTrue(result);
+
+ assertFalse(valuesIterator.moveToNext());
+ getNextFromFinishedIterator(valuesIterator);
} else {
+ while(valuesIterator.moveToNext()) {
+ //iterate through all keys
+ }
+ getNextFromFinishedIterator(valuesIterator);
assertFalse(result);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
new file mode 100644
index 0000000..51ea42d
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.readers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback;
+import org.apache.tez.runtime.library.common.shuffle.LocalDiskFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import static junit.framework.TestCase.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+public class TestUnorderedKVReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestUnorderedKVReader.class);
+
+ private static Configuration defaultConf = new Configuration();
+ private static FileSystem localFs = null;
+ private static Path workDir = null;
+
+ private String outputFileName = "ifile.out";
+ private Path outputPath;
+ private long rawLen;
+ private long compLen;
+
+ private UnorderedKVReader<Text, Text> unorderedKVReader;
+
+ static {
+ defaultConf.set("fs.defaultFS", "file:///");
+ try {
+ localFs = FileSystem.getLocal(defaultConf);
+ workDir = new Path(
+ new Path(System.getProperty("test.build.data", "/tmp")),
+ TestUnorderedKVReader.class.getName())
+ .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+ LOG.info("Using workDir: " + workDir);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ outputPath = new Path(workDir, outputFileName);
+ setupReader();
+ }
+
+ private void setupReader() throws IOException, InterruptedException {
+ defaultConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+ defaultConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+
+ createIFile(outputPath, 1);
+
+ final LinkedList<LocalDiskFetchedInput> inputs = new LinkedList<LocalDiskFetchedInput>();
+ LocalDiskFetchedInput realFetchedInput = new LocalDiskFetchedInput(0, rawLen, compLen, new
+ InputAttemptIdentifier(0, 0), outputPath, defaultConf, new FetchedInputCallback() {
+ @Override
+ public void fetchComplete(FetchedInput fetchedInput) {
+ }
+
+ @Override
+ public void fetchFailed(FetchedInput fetchedInput) {
+ }
+
+ @Override
+ public void freeResources(FetchedInput fetchedInput) {
+ }
+ });
+ LocalDiskFetchedInput fetchedInput = spy(realFetchedInput);
+ doNothing().when(fetchedInput).free();
+
+ inputs.add(fetchedInput);
+
+ TezCounters counters = new TezCounters();
+ TezCounter inputRecords = counters.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
+
+ ShuffleManager manager = mock(ShuffleManager.class);
+ doAnswer(new Answer() {
+ @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return (inputs.isEmpty()) ? null : inputs.remove();
+ }
+ }).when(manager).getNextInput();
+
+ unorderedKVReader = new UnorderedKVReader<Text, Text>(manager,
+ defaultConf, null, false, -1, -1, inputRecords);
+ }
+
+ private void createIFile(Path path, int recordCount) throws IOException {
+ FSDataOutputStream out = localFs.create(path);
+ IFile.Writer writer =
+ new IFile.Writer(defaultConf, out, Text.class, Text.class, null, null, null, true);
+
+ for (int i = 0; i < recordCount; i++) {
+ writer.append(new Text("Key_" + i), new Text("Value_" + i));
+ }
+ writer.close();
+ rawLen = writer.getRawLength();
+ compLen = writer.getCompressedLength();
+ out.close();
+ }
+
+ @Before
+ @After
+ public void cleanup() throws Exception {
+ localFs.delete(workDir, true);
+ }
+
+ @Test(timeout = 5000)
+ public void testReadingMultipleTimes() throws Exception {
+ int counter = 0;
+ while (unorderedKVReader.next()) {
+ unorderedKVReader.getCurrentKey();
+ unorderedKVReader.getCurrentKey();
+ counter++;
+ }
+ Assert.assertEquals(1, counter);
+
+ //Check the reader again. This shouldn't throw EOF exception in IFile
+ try {
+ boolean next = unorderedKVReader.next();
+ fail();
+ } catch(IOException ioe) {
+ Assert.assertTrue(ioe.getMessage().contains("For usage, please refer to"));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/b0054628/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
index 570deb7..0de400e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
@@ -19,7 +19,7 @@
package org.apache.tez.runtime.library.input;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@@ -35,7 +35,9 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.MergedInputContext;
+import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl;
+import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.junit.Test;
@@ -82,6 +84,18 @@ public class TestSortedGroupedMergedInput {
}
assertEquals(6, valCount);
}
+
+ getNextFromFinishedReader(kvsReader);
+ }
+
+ private void getNextFromFinishedReader(KeyValuesReader kvsReader) {
+ //Try reading again and it should throw IOException
+ try {
+ boolean hasNext = kvsReader.next();
+ fail();
+ } catch(IOException e) {
+ assertTrue(e.getMessage().contains("For usage, please refer to"));
+ }
}
@Test(timeout = 5000)
@@ -126,6 +140,7 @@ public class TestSortedGroupedMergedInput {
}
assertEquals(6, valCount);
}
+ getNextFromFinishedReader(kvsReader);
}
@Test(timeout = 5000)
@@ -172,6 +187,7 @@ public class TestSortedGroupedMergedInput {
assertEquals(6, valCount);
}
}
+ getNextFromFinishedReader(kvsReader);
}
@Test(timeout = 5000)
@@ -223,6 +239,7 @@ public class TestSortedGroupedMergedInput {
fail("Unexpected key");
}
}
+ getNextFromFinishedReader(kvsReader);
}
@Test(timeout = 5000)
@@ -277,6 +294,7 @@ public class TestSortedGroupedMergedInput {
fail("Unexpected key");
}
}
+ getNextFromFinishedReader(kvsReader);
}
// Reads all values for a key, but doesn't trigger the last hasNext() call.
@@ -324,6 +342,7 @@ public class TestSortedGroupedMergedInput {
}
assertEquals(6, valCount);
}
+ getNextFromFinishedReader(kvsReader);
}
@Test(timeout = 5000)
@@ -350,7 +369,84 @@ public class TestSortedGroupedMergedInput {
OrderedGroupedMergedKVInput input = new OrderedGroupedMergedKVInput(createMergedInputContext(), sInputs);
KeyValuesReader kvsReader = input.getReader();
- assertFalse(kvsReader.next());
+ assertTrue(kvsReader.next() == false);
+ getNextFromFinishedReader(kvsReader);
+ }
+
+ @Test(timeout = 5000)
+ public void testSimpleConcatenatedMergedKeyValueInput() throws Exception {
+
+ DummyInput sInput1 = new DummyInput(10);
+ DummyInput sInput2 = new DummyInput(10);
+ DummyInput sInput3 = new DummyInput(10);
+
+ List<Input> sInputs = new LinkedList<Input>();
+ sInputs.add(sInput1);
+ sInputs.add(sInput2);
+ sInputs.add(sInput3);
+ ConcatenatedMergedKeyValueInput input =
+ new ConcatenatedMergedKeyValueInput(createMergedInputContext(), sInputs);
+
+ KeyValueReader kvReader = input.getReader();
+ int keyCount = 0;
+ while (kvReader.next()) {
+ keyCount++;
+ Integer key = (Integer) kvReader.getCurrentKey();
+ Integer value = (Integer) kvReader.getCurrentValue();
+ }
+ assertTrue(keyCount == 30);
+
+ getNextFromFinishedReader(kvReader);
+ }
+
+ @Test(timeout = 5000)
+ public void testSimpleConcatenatedMergedKeyValuesInput() throws Exception {
+ SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 },
+ new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
+
+ SortedTestKeyValuesReader kvsReader2 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 },
+ new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
+
+ SortedTestKeyValuesReader kvsReader3 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 },
+ new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } });
+
+ SortedTestInput sInput1 = new SortedTestInput(kvsReader1);
+ SortedTestInput sInput2 = new SortedTestInput(kvsReader2);
+ SortedTestInput sInput3 = new SortedTestInput(kvsReader3);
+
+ List<Input> sInputs = new LinkedList<Input>();
+ sInputs.add(sInput1);
+ sInputs.add(sInput2);
+ sInputs.add(sInput3);
+ ConcatenatedMergedKeyValuesInput input =
+ new ConcatenatedMergedKeyValuesInput(createMergedInputContext(), sInputs);
+
+ KeyValuesReader kvsReader = input.getReader();
+ int keyCount = 0;
+ while (kvsReader.next()) {
+ keyCount++;
+ Integer key = (Integer) kvsReader.getCurrentKey();
+ Iterator<Object> valuesIter = kvsReader.getCurrentValues().iterator();
+ int valCount = 0;
+ while (valuesIter.hasNext()) {
+ valCount++;
+ Integer val = (Integer) valuesIter.next();
+ }
+ assertEquals(2, valCount);
+ }
+ assertEquals(9, keyCount);
+
+ getNextFromFinishedReader(kvsReader);
+ }
+
+ private void getNextFromFinishedReader(KeyValueReader kvReader) {
+ //Try reading again and it should throw IOException
+ try {
+ boolean hasNext = kvReader.next();
+ fail();
+ } catch(IOException e) {
+ assertTrue(e.getMessage().contains("For usage, please refer to"));
+ }
}
private static class SortedTestInput extends OrderedGroupedKVInput {
@@ -404,8 +500,10 @@ public class TestSortedGroupedMergedInput {
@Override
public boolean next() throws IOException {
+ hasCompletedProcessing();
currentIndex++;
if (keys == null || currentIndex >= keys.length) {
+ completedProcessing = true;
return false;
}
return true;
@@ -426,6 +524,47 @@ public class TestSortedGroupedMergedInput {
}
}
+ private static class DummyInput implements Input {
+ DummyKeyValueReader reader;
+
+ public DummyInput(int records) {
+ reader = new DummyKeyValueReader(records);
+ }
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public Reader getReader() throws Exception {
+ return reader;
+ }
+ }
+
+ private static class DummyKeyValueReader extends KeyValueReader {
+ private int records;
+
+ public DummyKeyValueReader(int records) {
+ this.records = records;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return (records-- > 0);
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ return records;
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException {
+ return records;
+ }
+ }
+
+
private static class RawComparatorForTest implements RawComparator<Integer> {
@Override