You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/27 21:12:10 UTC
[1/2] git commit: TEZ-482. Change KVReader interface to have separate
methods to get key and value instead of getRecord. (sseth)
Updated Branches:
refs/heads/master 6368056f4 -> 034ca0a95
TEZ-482. Change KVReader interface to have separate methods to get key
and value instead of getRecord. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9fbc8d37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9fbc8d37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9fbc8d37
Branch: refs/heads/master
Commit: 9fbc8d37e5858bbc5eb68ff1d42400b424c9be5e
Parents: 6368056
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Sep 27 12:11:18 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Sep 27 12:11:18 2013 -0700
----------------------------------------------------------------------
.../processor/FilterByWordInputProcessor.java | 6 ++--
.../processor/FilterByWordOutputProcessor.java | 6 ++--
.../org/apache/tez/mapreduce/input/MRInput.java | 24 ++++++++-----
.../mapreduce/processor/map/MapProcessor.java | 4 +--
.../processor/reduce/ReduceProcessor.java | 4 +--
.../tez/runtime/library/api/KVReader.java | 38 +++++---------------
.../broadcast/input/BroadcastKVReader.java | 10 ++++--
.../library/input/ShuffledMergedInput.java | 9 +++--
8 files changed, 45 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9fbc8d37/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index 63ae5fd..c9517aa 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -37,7 +37,6 @@ import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.library.api.KVReader;
-import org.apache.tez.runtime.library.api.KVReader.KVRecord;
import org.apache.tez.runtime.library.api.KVWriter;
import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
@@ -112,9 +111,8 @@ public class FilterByWordInputProcessor implements LogicalIOProcessor {
KVWriter kvWriter = kvOutput.getWriter();
while (kvReader.next()) {
- KVRecord kvRecord = kvReader.getCurrentKV();
- Object key = kvRecord.getKey();
- Object val = kvRecord.getValues().iterator().next();
+ Object key = kvReader.getCurrentKey();
+ Object val = kvReader.getCurrentValues().iterator().next();
Text valText = (Text) val;
String readVal = valText.toString();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9fbc8d37/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
index 9d2a3b9..ac1101d 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
@@ -31,7 +31,6 @@ import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.library.api.KVReader;
-import org.apache.tez.runtime.library.api.KVReader.KVRecord;
import org.apache.tez.runtime.library.api.KVWriter;
import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
@@ -87,9 +86,8 @@ public class FilterByWordOutputProcessor implements LogicalIOProcessor {
KVReader kvReader = kvInput.getReader();
KVWriter kvWriter = mrOutput.getWriter();
while (kvReader.next()) {
- KVRecord kvRecord = kvReader.getCurrentKV();
- Object key = kvRecord.getKey();
- Object value = kvRecord.getValues().iterator().next();
+ Object key = kvReader.getCurrentKey();
+ Object value = kvReader.getCurrentValues().iterator().next();
kvWriter.write(key, value);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9fbc8d37/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index ed675a4..0b4ae7f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -421,22 +421,30 @@ public class MRInput implements LogicalInput {
}
@Override
- public KVRecord getCurrentKV() throws IOException {
- KVRecord kvRecord = null;
+ public Object getCurrentKey() throws IOException {
+ if (localNewApi) {
+ try {
+ return newRecordReader.getCurrentKey();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while fetching next key", e);
+ }
+ } else {
+ return key;
+ }
+ }
+
+ @Override
+ public Iterable<Object> getCurrentValues() throws IOException {
if (localNewApi) {
try {
valueIterator.setValue(newRecordReader.getCurrentValue());
- kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while fetching next key-value", e);
+ throw new IOException("Interrupted while fetching next value(s)", e);
}
-
} else {
valueIterator.setValue(value);
- kvRecord = new KVRecord(key, valueIterable);
}
- return kvRecord;
+ return valueIterable;
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9fbc8d37/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 58eb109..54f6da9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -229,13 +229,13 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
@Override
public Object getCurrentKey() throws IOException,
InterruptedException {
- return reader.getCurrentKV().getKey();
+ return reader.getCurrentKey();
}
@Override
public Object getCurrentValue() throws IOException,
InterruptedException {
- return reader.getCurrentKV().getValues().iterator().next();
+ return reader.getCurrentValues().iterator().next();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9fbc8d37/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 1ba76f6..6c95ba8 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -227,8 +227,8 @@ implements LogicalIOProcessor {
public boolean more() throws IOException {
boolean more = in.next();
if(more) {
- currentKey = in.getCurrentKV().getKey();
- currentValues = in.getCurrentKV().getValues().iterator();
+ currentKey = in.getCurrentKey();
+ currentValues = in.getCurrentValues().iterator();
} else {
currentKey = null;
currentValues = null;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9fbc8d37/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
index 9c6b380..fb14e9a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
@@ -28,9 +28,8 @@ import org.apache.tez.runtime.api.Reader;
* Example usage
* <code>
* while (kvReader.next()) {
- * KVRecord kvRecord = getCurrentKV();
- * Object key = kvRecord.getKey();
- * Iterable values = kvRecord.getValues();
+ * Object key = kvReader.getKey();
+ * Iterable<Object> values = kvReader.getValues();
* </code>
*
*/
@@ -45,37 +44,16 @@ public interface KVReader extends Reader {
*/
public boolean next() throws IOException;
+
/**
- * Return the current key/value(s) pair. Use moveToNext() to advance.
+ * Returns the current key
* @return
- * @throws IOException
*/
- public KVRecord getCurrentKV() throws IOException;
+ public Object getCurrentKey() throws IOException;
- // TODO NEWTEZ Move this to getCurrentKey and getCurrentValue independently. Otherwise usage pattern seems to be getCurrentKV.getKey, getCurrentKV.getValue
-
- // TODO NEWTEZ KVRecord which does not need to return a list!
- // TODO NEWTEZ Parameterize this
/**
- * Represents a key and an associated set of values
- *
+ * Returns an Iterable view of the values associated with the current key
+ * @return
*/
- public static class KVRecord {
-
- private Object key;
- private Iterable<Object> values;
-
- public KVRecord(Object key, Iterable<Object> values) {
- this.key = key;
- this.values = values;
- }
-
- public Object getKey() {
- return this.key;
- }
-
- public Iterable<Object> getValues() {
- return this.values;
- }
- }
+ public Iterable<Object> getCurrentValues() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9fbc8d37/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
index 675d90d..51c0d5e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -123,11 +123,15 @@ public class BroadcastKVReader<K, V> implements KVReader {
}
}
- @SuppressWarnings("unchecked")
@Override
- public KVRecord getCurrentKV() throws IOException {
+ public Object getCurrentKey() throws IOException {
+ return (Object) key;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Iterable<Object> getCurrentValues() throws IOException {
this.valueIterator.setValue(value);
- return new KVRecord((Object)key, (Iterable<Object>)this.valueIterable);
+ return (Iterable<Object>) this.valueIterable;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9fbc8d37/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 771ac1b..527f8e1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -137,10 +137,13 @@ public class ShuffledMergedInput implements LogicalInput {
return vIter.moveToNext();
}
+ public Object getCurrentKey() throws IOException {
+ return vIter.getKey();
+ }
+
@SuppressWarnings("unchecked")
- @Override
- public KVRecord getCurrentKV() {
- return new KVRecord(vIter.getKey(), vIter.getValues());
+ public Iterable<Object> getCurrentValues() throws IOException {
+ return vIter.getValues();
}
};
}
[2/2] git commit: TEZ-508. Add interfaces for KeyValueReader and
KeyValuesReader. (sseth)
Posted by ss...@apache.org.
TEZ-508. Add interfaces for KeyValueReader and KeyValuesReader. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/034ca0a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/034ca0a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/034ca0a9
Branch: refs/heads/master
Commit: 034ca0a95175eebaec4d2a470027ed85b2051836
Parents: 9fbc8d3
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Sep 27 12:11:51 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Sep 27 12:11:51 2013 -0700
----------------------------------------------------------------------
.../processor/FilterByWordInputProcessor.java | 10 ++--
.../processor/FilterByWordOutputProcessor.java | 10 ++--
.../org/apache/tez/mapreduce/input/MRInput.java | 62 +++-----------------
.../apache/tez/mapreduce/output/MROutput.java | 6 +-
.../mapreduce/processor/map/MapProcessor.java | 22 +++----
.../processor/reduce/ReduceProcessor.java | 18 +++---
.../tez/runtime/library/api/KVReader.java | 59 -------------------
.../tez/runtime/library/api/KVWriter.java | 40 -------------
.../tez/runtime/library/api/KeyValueReader.java | 60 +++++++++++++++++++
.../tez/runtime/library/api/KeyValueWriter.java | 40 +++++++++++++
.../runtime/library/api/KeyValuesReader.java | 59 +++++++++++++++++++
.../broadcast/input/BroadcastKVReader.java | 60 +++----------------
.../broadcast/output/FileBasedKVWriter.java | 4 +-
.../library/input/ShuffledMergedInput.java | 6 +-
.../library/input/ShuffledUnorderedKVInput.java | 4 +-
.../library/output/InMemorySortedOutput.java | 4 +-
.../library/output/OnFileSortedOutput.java | 6 +-
.../library/output/OnFileUnorderedKVOutput.java | 4 +-
.../output/TestOnFileUnorderedKVOutput.java | 4 +-
19 files changed, 224 insertions(+), 254 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index c9517aa..8e4dea9 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -36,8 +36,8 @@ import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.library.api.KVReader;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
public class FilterByWordInputProcessor implements LogicalIOProcessor {
@@ -107,12 +107,12 @@ public class FilterByWordInputProcessor implements LogicalIOProcessor {
}
}
- KVReader kvReader = mrInput.getReader();
- KVWriter kvWriter = kvOutput.getWriter();
+ KeyValueReader kvReader = mrInput.getReader();
+ KeyValueWriter kvWriter = kvOutput.getWriter();
while (kvReader.next()) {
Object key = kvReader.getCurrentKey();
- Object val = kvReader.getCurrentValues().iterator().next();
+ Object val = kvReader.getCurrentValue();
Text valText = (Text) val;
String readVal = valText.toString();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
index ac1101d..e56e2a7 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
@@ -30,8 +30,8 @@ import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.library.api.KVReader;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
@@ -83,11 +83,11 @@ public class FilterByWordOutputProcessor implements LogicalIOProcessor {
ShuffledUnorderedKVInput kvInput = (ShuffledUnorderedKVInput) li;
MROutput mrOutput = (MROutput) lo;
- KVReader kvReader = kvInput.getReader();
- KVWriter kvWriter = mrOutput.getWriter();
+ KeyValueReader kvReader = kvInput.getReader();
+ KeyValueWriter kvWriter = mrOutput.getWriter();
while (kvReader.next()) {
Object key = kvReader.getCurrentKey();
- Object value = kvReader.getCurrentValues().iterator().next();
+ Object value = kvReader.getCurrentValue();
kvWriter.write(key, value);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 0b4ae7f..ef0de5c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -18,7 +18,6 @@
package org.apache.tez.mapreduce.input;
import java.io.IOException;
-import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -39,9 +38,9 @@ import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
@@ -53,7 +52,7 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
import com.google.common.base.Preconditions;
@@ -167,7 +166,7 @@ public class MRInput implements LogicalInput {
}
@Override
- public KVReader getReader() throws IOException {
+ public KeyValueReader getReader() throws IOException {
Preconditions
.checkState(recordReaderCreated == false,
"Only a single instance of record reader can be created for this input.");
@@ -227,46 +226,7 @@ public class MRInput implements LogicalInput {
private TaskAttemptContext createTaskAttemptContext() {
return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
}
-
-
- private static class SimpleValueIterator implements Iterator<Object> {
-
- private Object value;
-
- public void setValue(Object value) {
- this.value = value;
- }
-
- public boolean hasNext() {
- return value != null;
- }
-
- public Object next() {
- Object value = this.value;
- this.value = null;
- return value;
- }
-
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- private static class SimpleIterable implements Iterable<Object> {
- private final Iterator<Object> iterator;
- public SimpleIterable(Iterator<Object> iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public Iterator<Object> iterator() {
- return iterator;
- }
- }
-
-
-
@SuppressWarnings("unchecked")
private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo)
throws IOException {
@@ -373,14 +333,11 @@ public class MRInput implements LogicalInput {
return allTaskSplitMetaInfo;
}
- private class MRInputKVReader implements KVReader {
+ private class MRInputKVReader implements KeyValueReader {
Object key;
Object value;
- private SimpleValueIterator valueIterator = new SimpleValueIterator();
- private SimpleIterable valueIterable = new SimpleIterable(valueIterator);
-
private final boolean localNewApi;
MRInputKVReader() {
@@ -432,19 +389,18 @@ public class MRInput implements LogicalInput {
return key;
}
}
-
+
@Override
- public Iterable<Object> getCurrentValues() throws IOException {
+ public Object getCurrentValue() throws IOException {
if (localNewApi) {
try {
- valueIterator.setValue(newRecordReader.getCurrentValue());
+ return newRecordReader.getCurrentValue();
} catch (InterruptedException e) {
- throw new IOException("Interrupted while fetching next value(s)", e);
+ throw new IOException("Interrupted while fetching next value", e);
}
} else {
- valueIterator.setValue(value);
+ return value;
}
- return valueIterable;
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 11184e4..f250a8c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -35,7 +35,7 @@ import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezOutputContext;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
public class MROutput implements LogicalOutput {
@@ -235,8 +235,8 @@ public class MROutput implements LogicalOutput {
}
@Override
- public KVWriter getWriter() throws IOException {
- return new KVWriter() {
+ public KeyValueWriter getWriter() throws IOException {
+ return new KeyValueWriter() {
private final boolean useNewWriter = useNewApi;
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 54f6da9..890f1fa 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -48,8 +48,8 @@ import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.library.api.KVReader;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -113,7 +113,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
}
}
- KVWriter kvWriter = null;
+ KeyValueWriter kvWriter = null;
if (!(out instanceof OnFileSortedOutput)) {
kvWriter = ((MROutput)out).getWriter();
} else {
@@ -133,7 +133,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
final JobConf job,
final MRTaskReporter reporter,
final MRInputLegacy input,
- final KVWriter output
+ final KeyValueWriter output
) throws IOException, InterruptedException {
// Initialize input in-line since it sets parameters which may be used by the processor.
@@ -156,7 +156,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
private void runNewMapper(final JobConf job,
MRTaskReporter reporter,
final MRInputLegacy in,
- KVWriter out
+ KeyValueWriter out
) throws IOException, InterruptedException {
// Initialize input in-line since it sets parameters which may be used by the processor.
@@ -206,7 +206,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
private static class NewRecordReader extends
org.apache.hadoop.mapreduce.RecordReader {
private final MRInput in;
- private KVReader reader;
+ private KeyValueReader reader;
private NewRecordReader(MRInput in) throws IOException {
this.in = in;
@@ -235,7 +235,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
@Override
public Object getCurrentValue() throws IOException,
InterruptedException {
- return reader.getCurrentValues().iterator().next();
+ return reader.getCurrentValue();
}
@Override
@@ -299,9 +299,9 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
private static class OldOutputCollector
implements OutputCollector {
- private final KVWriter output;
+ private final KeyValueWriter output;
- OldOutputCollector(KVWriter output) {
+ OldOutputCollector(KeyValueWriter output) {
this.output = output;
}
@@ -312,9 +312,9 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
private class NewOutputCollector
extends org.apache.hadoop.mapreduce.RecordWriter {
- private final KVWriter out;
+ private final KeyValueWriter out;
- NewOutputCollector(KVWriter out) throws IOException {
+ NewOutputCollector(KeyValueWriter out) throws IOException {
this.out = out;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 6c95ba8..0e41b0b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -45,8 +45,8 @@ import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.library.api.KVReader;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
@@ -130,9 +130,9 @@ implements LogicalIOProcessor {
throw new IOException("Illegal input to reduce: " + in.getClass());
}
ShuffledMergedInputLegacy shuffleInput = (ShuffledMergedInputLegacy)in;
- KVReader kvReader = shuffleInput.getReader();
+ KeyValuesReader kvReader = shuffleInput.getReader();
- KVWriter kvWriter = null;
+ KeyValueWriter kvWriter = null;
if((out instanceof MROutput)) {
kvWriter = ((MROutput) out).getWriter();
} else if ((out instanceof OnFileSortedOutput)) {
@@ -162,11 +162,11 @@ implements LogicalIOProcessor {
void runOldReducer(JobConf job,
final MRTaskReporter reporter,
- KVReader input,
+ KeyValuesReader input,
RawComparator comparator,
Class keyClass,
Class valueClass,
- final KVWriter output) throws IOException, InterruptedException {
+ final KeyValueWriter output) throws IOException, InterruptedException {
Reducer reducer =
ReflectionUtils.newInstance(job.getReducerClass(), job);
@@ -210,12 +210,12 @@ implements LogicalIOProcessor {
private static class ReduceValuesIterator<KEY,VALUE>
implements Iterator<VALUE> {
private Counter reduceInputValueCounter;
- private KVReader in;
+ private KeyValuesReader in;
private Progressable reporter;
private Object currentKey;
private Iterator<Object> currentValues;
- public ReduceValuesIterator (KVReader in,
+ public ReduceValuesIterator (KeyValuesReader in,
Progressable reporter,
Counter reduceInputValueCounter)
throws IOException {
@@ -268,7 +268,7 @@ implements LogicalIOProcessor {
RawComparator comparator,
Class keyClass,
Class valueClass,
- final KVWriter out
+ final KeyValueWriter out
) throws IOException,InterruptedException,
ClassNotFoundException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
deleted file mode 100644
index fb14e9a..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.api;
-
-import java.io.IOException;
-
-import org.apache.tez.runtime.api.Reader;
-
-/**
- * A key/value(s) pair based {@link Reader}.
- *
- * Example usage
- * <code>
- * while (kvReader.next()) {
- * Object key = kvReader.getKey();
- * Iterable<Object> values = kvReader.getValues();
- * </code>
- *
- */
-public interface KVReader extends Reader {
-
- /**
- * Moves to the next key/values(s) pair
- *
- * @return true if another key/value(s) pair exists, false if there are no more.
- * @throws IOException
- * if an error occurs
- */
- public boolean next() throws IOException;
-
-
- /**
- * Returns the current key
- * @return
- */
- public Object getCurrentKey() throws IOException;
-
- /**
- * Returns an Iterable view of the values associated with the current key
- * @return
- */
- public Iterable<Object> getCurrentValues() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
deleted file mode 100644
index ff952ed..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.api;
-
-import java.io.IOException;
-
-import org.apache.tez.runtime.api.Writer;
-
-/**
- * A key/value(s) pair based {@link Writer}
- */
-public interface KVWriter extends Writer {
- /**
- * Writes a key/value pair.
- *
- * @param key
- * the key to write
- * @param value
- * the value to write
- * @throws IOException
- * if an error occurs
- */
- public void write(Object key, Object value) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
new file mode 100644
index 0000000..ad327b4
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.api.Reader;
+
+/**
+ * A key/value(s) pair based {@link Reader}.
+ *
+ * Example usage
+ * <code>
+ * while (kvReader.next()) {
+ * Object key = kvReader.getCurrentKey();
+ * Object value = kvReader.getCurrentValue();
+ * </code>
+ *
+ */
+public interface KeyValueReader extends Reader {
+
+ /**
+ * Moves to the next key/values(s) pair
+ *
+ * @return true if another key/value(s) pair exists, false if there are no more.
+ * @throws IOException
+ * if an error occurs
+ */
+ public boolean next() throws IOException;
+
+
+ /**
+ * Returns the current key
+ * @return
+ */
+ public Object getCurrentKey() throws IOException;
+
+
+ /**
+ * @return the current value
+ * @throws IOException
+ */
+ public Object getCurrentValue() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
new file mode 100644
index 0000000..235f361
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.api.Writer;
+
+/**
+ * A key/value(s) pair based {@link Writer}
+ */
+public interface KeyValueWriter extends Writer {
+ /**
+ * Writes a key/value pair.
+ *
+ * @param key
+ * the key to write
+ * @param value
+ * the value to write
+ * @throws IOException
+ * if an error occurs
+ */
+ public void write(Object key, Object value) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
new file mode 100644
index 0000000..f300e2a
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.api.Reader;
+
+/**
+ * A key/value(s) pair based {@link Reader}.
+ *
+ * Example usage
+ * <code>
+ * while (kvReader.next()) {
+ * Object key = kvReader.getCurrentKey();
+ * Iterable<Object> values = kvReader.getCurrentValues();
+ * </code>
+ *
+ */
+public interface KeyValuesReader extends Reader {
+
+ /**
+ * Moves to the next key/values(s) pair
+ *
+ * @return true if another key/value(s) pair exists, false if there are no more.
+ * @throws IOException
+ * if an error occurs
+ */
+ public boolean next() throws IOException;
+
+
+ /**
+ * Returns the current key
+ * @return
+ */
+ public Object getCurrentKey() throws IOException;
+
+ /**
+ * Returns an Iterable view of the values associated with the current key
+ * @return
+ */
+ public Iterable<Object> getCurrentValues() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
index 51c0d5e..070f902 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -19,7 +19,6 @@
package org.apache.tez.runtime.library.broadcast.input;
import java.io.IOException;
-import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,15 +29,15 @@ import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
-public class BroadcastKVReader<K, V> implements KVReader {
+public class BroadcastKVReader<K, V> implements KeyValueReader {
private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
@@ -53,9 +52,6 @@ public class BroadcastKVReader<K, V> implements KVReader {
private final DataInputBuffer keyIn;
private final DataInputBuffer valIn;
- private final SimpleValueIterator valueIterator;
- private final SimpleIterable valueIterable;
-
private K key;
private V value;
@@ -89,9 +85,6 @@ public class BroadcastKVReader<K, V> implements KVReader {
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(valIn);
-
- this.valueIterator = new SimpleValueIterator();
- this.valueIterable = new SimpleIterable(this.valueIterator);
}
// TODO NEWTEZ Maybe add an interface to check whether next will block.
@@ -127,11 +120,10 @@ public class BroadcastKVReader<K, V> implements KVReader {
public Object getCurrentKey() throws IOException {
return (Object) key;
}
-
- @SuppressWarnings("unchecked")
- public Iterable<Object> getCurrentValues() throws IOException {
- this.valueIterator.setValue(value);
- return (Iterable<Object>) this.valueIterable;
+
+ @Override
+ public Object getCurrentValue() throws IOException {
+ return value;
}
/**
@@ -194,42 +186,4 @@ public class BroadcastKVReader<K, V> implements KVReader {
fetchedInput.getSize(), codec, null);
}
}
-
-
-
- // TODO NEWTEZ Move this into a common class. Also used in MRInput
- private class SimpleValueIterator implements Iterator<V> {
-
- private V value;
-
- public void setValue(V value) {
- this.value = value;
- }
-
- public boolean hasNext() {
- return value != null;
- }
-
- public V next() {
- V value = this.value;
- this.value = null;
- return value;
- }
-
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- private class SimpleIterable implements Iterable<V> {
- private final Iterator<V> iterator;
- public SimpleIterable(Iterator<V> iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public Iterator<V> iterator() {
- return iterator;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index 7d33e63..9941de0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
import org.apache.tez.runtime.api.TezOutputContext;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
@@ -40,7 +40,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
-public class FileBasedKVWriter implements KVWriter {
+public class FileBasedKVWriter implements KeyValueWriter {
private static final Log LOG = LogFactory.getLog(FileBasedKVWriter.class);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 527f8e1..da152b8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -32,7 +32,7 @@ import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.ValuesIterator;
import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;
@@ -121,7 +121,7 @@ public class ShuffledMergedInput implements LogicalInput {
* @return a KVReader over the sorted input.
*/
@Override
- public KVReader getReader() throws IOException {
+ public KeyValuesReader getReader() throws IOException {
if (rawIter == null) {
try {
waitForInputReady();
@@ -130,7 +130,7 @@ public class ShuffledMergedInput implements LogicalInput {
throw new IOException("Interrupted while waiting for input ready", e);
}
}
- return new KVReader() {
+ return new KeyValuesReader() {
@Override
public boolean next() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 40eff70..b6e17b6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -28,7 +28,7 @@ import org.apache.tez.common.TezUtils;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleManager;
@@ -63,7 +63,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
}
@Override
- public KVReader getReader() throws Exception {
+ public KeyValueReader getReader() throws Exception {
return this.kvReader;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
index 2ec6b2a..2a2872c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
@@ -27,7 +27,7 @@ import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.Output;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.api.Writer;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.common.sort.impl.dflt.InMemoryShuffleSorter;
/**
@@ -52,7 +52,7 @@ public class InMemorySortedOutput implements LogicalOutput {
@Override
public Writer getWriter() throws IOException {
- return new KVWriter() {
+ return new KeyValueWriter() {
@Override
public void write(Object key, Object value) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 42e1eeb..5415053 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -30,7 +30,7 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
@@ -68,8 +68,8 @@ public class OnFileSortedOutput implements LogicalOutput {
}
@Override
- public KVWriter getWriter() throws IOException {
- return new KVWriter() {
+ public KeyValueWriter getWriter() throws IOException {
+ return new KeyValueWriter() {
@Override
public void write(Object key, Object value) throws IOException {
sorter.write(key, value);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 93c00d3..db4085c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -29,7 +29,7 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.broadcast.output.FileBasedKVWriter;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
@@ -55,7 +55,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
}
@Override
- public KVWriter getWriter() throws Exception {
+ public KeyValueWriter getWriter() throws Exception {
return kvWriter;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index ff9afbd..c7626fd 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -48,7 +48,7 @@ import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
import org.apache.tez.runtime.api.impl.TezUmbilical;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.apache.tez.runtime.library.testutils.KVDataGen;
@@ -125,7 +125,7 @@ public class TestOnFileUnorderedKVOutput {
events = kvOutput.initialize(outputContext);
assertTrue(events != null && events.size() == 0);
- KVWriter kvWriter = kvOutput.getWriter();
+ KeyValueWriter kvWriter = kvOutput.getWriter();
List<KVPair> data = KVDataGen.generateTestData(true);
for (KVPair kvp : data) {
kvWriter.write(kvp.getKey(), kvp.getvalue());