You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/11/28 07:37:58 UTC

git commit: TEZ-652. Make OrderedWordCount do grouping in the AM (bikas)

Updated Branches:
  refs/heads/master b3665f411 -> c4218172f


TEZ-652. Make OrderedWordCount do grouping in the AM (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c4218172
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c4218172
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c4218172

Branch: refs/heads/master
Commit: c4218172f16a2fe266d6e1560c5cdc6b1bf468bc
Parents: b3665f4
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Nov 27 22:37:05 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Nov 27 22:37:05 2013 -0800

----------------------------------------------------------------------
 .../mapreduce/examples/OrderedWordCount.java    | 19 ++++++++----
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  | 32 ++++++++++++++++++++
 2 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c4218172/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index b1448ae..dbf7eb9 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -156,8 +157,13 @@ public class OrderedWordCount {
         Text.class.getName());
     mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
         IntWritable.class.getName());
-    mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
-        TextInputFormat.class.getName());
+    if (generateSplitsInClient) {
+      mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+          TextInputFormat.class.getName());
+    } else {
+      mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, 
+          TezGroupedSplitsInputFormat.class.getName());
+    }
     mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
     mapStageConf.setBoolean("mapred.mapper.new-api", true);
 
@@ -206,8 +212,8 @@ public class OrderedWordCount {
     List<Vertex> vertices = new ArrayList<Vertex>();
 
     byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
-    byte[] mapInputPayload =
-        MRHelpers.createMRInputPayload(mapPayload, null);
+    byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload, 
+            TextInputFormat.class.getName());
     int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
     Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(mapPayload),
@@ -277,9 +283,10 @@ public class OrderedWordCount {
   }
 
   private static void printUsage() {
-    System.err.println("Usage: orderedwordcount <in> <out> [-generateSplitsInClient true/<false>]");
+    String options = " [-generateSplitsInClient true/<false>]";
+    System.err.println("Usage: orderedwordcount <in> <out>" + options);
     System.err.println("Usage (In Session Mode):"
-        + " orderedwordcount <in1> <out1> ... <inN> <outN> [-generateSplitsInClient true/<false>]");
+        + " orderedwordcount <in1> <out1> ... <inN> <outN>" + options);
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c4218172/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 87b4032..90790b7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -702,6 +702,38 @@ public class MRHelpers {
    * should be passed as an argument to this method.
    */
   public static byte[] createMRInputPayloadWithGrouping(byte[] configurationBytes,
+      String inputFormatName) throws IOException {
+    Preconditions.checkArgument(configurationBytes != null,
+        "Configuration bytes must be specified");
+    Preconditions.checkArgument(inputFormatName != null, 
+        "InputFormat must be specified");
+    return createMRInputPayload(ByteString
+        .copyFrom(configurationBytes), null, inputFormatName);    
+  }
+
+  /**
+   * Called to specify that grouping of input splits be performed by Tez
+   * The conf should have the input format class configuration 
+   * set to the TezGroupedSplitsInputFormat. The real input format class name 
+   * should be passed as an argument to this method.
+   */
+  public static byte[] createMRInputPayloadWithGrouping(Configuration conf,
+      String inputFormatName) throws IOException {
+    Preconditions
+        .checkArgument(conf != null, "Configuration must be specified");
+    Preconditions.checkArgument(inputFormatName != null, 
+        "InputFormat must be specified");
+    return createMRInputPayload(createByteStringFromConf(conf), 
+        null, inputFormatName);    
+  }
+
+  /**
+   * Called to specify that grouping of input splits be performed by Tez
+   * The configurationBytes conf should have the input format class configuration 
+   * set to the TezGroupedSplitsInputFormat. The real input format class name 
+   * should be passed as an argument to this method.
+   */
+  public static byte[] createMRInputPayloadWithGrouping(byte[] configurationBytes,
       MRSplitsProto mrSplitsProto, String inputFormatName) throws IOException {
     Preconditions.checkArgument(configurationBytes != null,
         "Configuration bytes must be specified");