You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:50:01 UTC

[16/25] git commit: TEZ-1580. Change TestOrderedWordCount to optionally use MR configs. (hitesh)

TEZ-1580. Change TestOrderedWordCount to optionally use MR configs. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9dd0cb4d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9dd0cb4d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9dd0cb4d

Branch: refs/heads/TEZ-8
Commit: 9dd0cb4d8e933d5f57b3d9ae532e7167978aed68
Parents: 5e5683a
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Sep 12 14:25:17 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Sep 12 14:25:17 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../examples/TestOrderedWordCount.java          | 61 +++++++++++++++-----
 2 files changed, 49 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9dd0cb4d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 73a3671..f71c2e2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@ ALL CHANGES:
   TEZ-1575. MRRSleepJob does not pick MR settings for container size and java opts.
   TEZ-1578. Remove TeraSort from Tez codebase.
   TEZ-1569. Add tests for preemption
+  TEZ-1580. Change TestOrderedWordCount to optionally use MR configs.
 
 Release 0.5.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/9dd0cb4d/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 2c5db10..a36d1d2 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -149,7 +149,9 @@ public class TestOrderedWordCount extends Configured implements Tool {
   public DAG createDAG(FileSystem fs, Configuration conf,
       Map<String, LocalResource> commonLocalResources, Path stagingDir,
       int dagIndex, String inputPath, String outputPath,
-      boolean generateSplitsInClient) throws Exception {
+      boolean generateSplitsInClient,
+      boolean useMRSettings,
+      int intermediateNumReduceTasks) throws Exception {
 
     Configuration mapStageConf = new JobConf(conf);
     mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
@@ -196,32 +198,62 @@ public class TestOrderedWordCount extends Configured implements Tool {
       dsd = MRInputLegacy.createConfigBuilder(mapStageConf, TextInputFormat.class, inputPath).build();
     }
 
-    Vertex mapVertex = Vertex.create("initialmap", ProcessorDescriptor.create(
-        MapProcessor.class.getName()).setUserPayload(
-        TezUtils.createUserPayloadFromConf(mapStageConf))
-        .setHistoryText(mapStageHistoryText)).addTaskLocalFiles(commonLocalResources);
-    mapVertex.addDataSource("MRInput", dsd);
+    Vertex mapVertex;
+    ProcessorDescriptor mapProcessorDescriptor =
+        ProcessorDescriptor.create(MapProcessor.class.getName())
+            .setUserPayload(
+                TezUtils.createUserPayloadFromConf(mapStageConf))
+            .setHistoryText(mapStageHistoryText);
+    if (!useMRSettings) {
+      mapVertex = Vertex.create("initialmap", mapProcessorDescriptor);
+    } else {
+      mapVertex = Vertex.create("initialmap", mapProcessorDescriptor, -1,
+          MRHelpers.getResourceForMRMapper(mapStageConf));
+      mapVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper(mapStageConf));
+    }
+    mapVertex.addTaskLocalFiles(commonLocalResources)
+        .addDataSource("MRInput", dsd);
     vertices.add(mapVertex);
 
     ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
     iReduceStageConf.writeXml(iROutputStream);
     String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
-    Vertex ivertex = Vertex.create("intermediate_reducer", ProcessorDescriptor.create(
+
+    ProcessorDescriptor iReduceProcessorDescriptor = ProcessorDescriptor.create(
         ReduceProcessor.class.getName())
         .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
-        .setHistoryText(iReduceStageHistoryText), 2);
-    ivertex.addTaskLocalFiles(commonLocalResources);
-    vertices.add(ivertex);
+        .setHistoryText(iReduceStageHistoryText);
+
+    Vertex intermediateVertex;
+    if (!useMRSettings) {
+      intermediateVertex = Vertex.create("intermediate_reducer", iReduceProcessorDescriptor,
+          intermediateNumReduceTasks);
+    } else {
+      intermediateVertex = Vertex.create("intermediate_reducer", iReduceProcessorDescriptor,
+          intermediateNumReduceTasks, MRHelpers.getResourceForMRReducer(iReduceStageConf));
+      intermediateVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(iReduceStageConf));
+    }
+    intermediateVertex.addTaskLocalFiles(commonLocalResources);
+    vertices.add(intermediateVertex);
 
     ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
     finalReduceConf.writeXml(finalReduceOutputStream);
     String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
     UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
-    Vertex finalReduceVertex = Vertex.create("finalreduce",
+    Vertex finalReduceVertex;
+
+    ProcessorDescriptor finalReduceProcessorDescriptor =
         ProcessorDescriptor.create(
             ReduceProcessor.class.getName())
             .setUserPayload(finalReducePayload)
-            .setHistoryText(finalReduceStageHistoryText), 1);
+            .setHistoryText(finalReduceStageHistoryText);
+    if (!useMRSettings) {
+      finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1);
+    } else {
+      finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1,
+          MRHelpers.getResourceForMRReducer(finalReduceConf));
+      finalReduceVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(finalReduceConf));
+    }
     finalReduceVertex.addTaskLocalFiles(commonLocalResources);
     finalReduceVertex.addDataSink("MROutput",
         MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath)
@@ -283,6 +315,9 @@ public class TestOrderedWordCount extends Configured implements Tool {
         * 1000;
 
     boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false);
+    boolean useMRSettings = conf.getBoolean("USE_MR_CONFIGS", true);
+    // TODO needs to use auto reduce parallelism
+    int intermediateNumReduceTasks = conf.getInt("IREDUCE_NUM_TASKS", 2);
 
     if (((otherArgs.length%2) != 0)
         || (!useTezSession && otherArgs.length != 2)) {
@@ -371,7 +406,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
         
         DAG dag = instance.createDAG(fs, conf, localResources,
             stagingDir, dagIndex, inputPath, outputPath,
-            generateSplitsInClient);
+            generateSplitsInClient, useMRSettings, intermediateNumReduceTasks);
 
         boolean doPreWarm = dagIndex == 1 && useTezSession
             && conf.getBoolean("PRE_WARM_SESSION", true);