You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/11/28 07:37:58 UTC
git commit: TEZ-652. Make OrderedWordCount do grouping in the AM
(bikas)
Updated Branches:
refs/heads/master b3665f411 -> c4218172f
TEZ-652. Make OrderedWordCount do grouping in the AM (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c4218172
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c4218172
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c4218172
Branch: refs/heads/master
Commit: c4218172f16a2fe266d6e1560c5cdc6b1bf468bc
Parents: b3665f4
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Nov 27 22:37:05 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Nov 27 22:37:05 2013 -0800
----------------------------------------------------------------------
.../mapreduce/examples/OrderedWordCount.java | 19 ++++++++----
.../apache/tez/mapreduce/hadoop/MRHelpers.java | 32 ++++++++++++++++++++
2 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c4218172/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index b1448ae..dbf7eb9 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -156,8 +157,13 @@ public class OrderedWordCount {
Text.class.getName());
mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
IntWritable.class.getName());
- mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
- TextInputFormat.class.getName());
+ if (generateSplitsInClient) {
+ mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+ TextInputFormat.class.getName());
+ } else {
+ mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+ TezGroupedSplitsInputFormat.class.getName());
+ }
mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
mapStageConf.setBoolean("mapred.mapper.new-api", true);
@@ -206,8 +212,8 @@ public class OrderedWordCount {
List<Vertex> vertices = new ArrayList<Vertex>();
byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
- byte[] mapInputPayload =
- MRHelpers.createMRInputPayload(mapPayload, null);
+ byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
+ TextInputFormat.class.getName());
int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
MapProcessor.class.getName()).setUserPayload(mapPayload),
@@ -277,9 +283,10 @@ public class OrderedWordCount {
}
private static void printUsage() {
- System.err.println("Usage: orderedwordcount <in> <out> [-generateSplitsInClient true/<false>]");
+ String options = " [-generateSplitsInClient true/<false>]";
+ System.err.println("Usage: orderedwordcount <in> <out>" + options);
System.err.println("Usage (In Session Mode):"
- + " orderedwordcount <in1> <out1> ... <inN> <outN> [-generateSplitsInClient true/<false>]");
+ + " orderedwordcount <in1> <out1> ... <inN> <outN>" + options);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c4218172/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 87b4032..90790b7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -702,6 +702,38 @@ public class MRHelpers {
* should be passed as an argument to this method.
*/
public static byte[] createMRInputPayloadWithGrouping(byte[] configurationBytes,
+ String inputFormatName) throws IOException {
+ Preconditions.checkArgument(configurationBytes != null,
+ "Configuration bytes must be specified");
+ Preconditions.checkArgument(inputFormatName != null,
+ "InputFormat must be specified");
+ return createMRInputPayload(ByteString
+ .copyFrom(configurationBytes), null, inputFormatName);
+ }
+
+ /**
+ * Called to specify that grouping of input splits be performed by Tez
+ * The conf should have the input format class configuration
+ * set to the TezGroupedSplitsInputFormat. The real input format class name
+ * should be passed as an argument to this method.
+ */
+ public static byte[] createMRInputPayloadWithGrouping(Configuration conf,
+ String inputFormatName) throws IOException {
+ Preconditions
+ .checkArgument(conf != null, "Configuration must be specified");
+ Preconditions.checkArgument(inputFormatName != null,
+ "InputFormat must be specified");
+ return createMRInputPayload(createByteStringFromConf(conf),
+ null, inputFormatName);
+ }
+
+ /**
+ * Called to specify that grouping of input splits be performed by Tez
+ * The configurationBytes conf should have the input format class configuration
+ * set to the TezGroupedSplitsInputFormat. The real input format class name
+ * should be passed as an argument to this method.
+ */
+ public static byte[] createMRInputPayloadWithGrouping(byte[] configurationBytes,
MRSplitsProto mrSplitsProto, String inputFormatName) throws IOException {
Preconditions.checkArgument(configurationBytes != null,
"Configuration bytes must be specified");