You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:39 UTC

[33/50] [abbrv] git commit: TEZ-1607. support mr envs in mrrsleep and testorderedwordcount (bikas)

TEZ-1607. support mr envs in mrrsleep and testorderedwordcount (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/09bd44e7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/09bd44e7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/09bd44e7

Branch: refs/heads/branch-0.5
Commit: 09bd44e76c3e4fd10a474197b7db0f0e1c78541b
Parents: 97528e9
Author: Bikas Saha <bi...@apache.org>
Authored: Sun Sep 21 15:33:45 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Sun Sep 21 15:33:45 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                           |  1 +
 .../apache/tez/mapreduce/examples/MRRSleepJob.java    | 14 ++++++++++----
 .../tez/mapreduce/examples/TestOrderedWordCount.java  |  9 +++++++++
 3 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/09bd44e7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f069959..53409fb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ ALL CHANGES:
   TEZ-1524. Resolve user group information only if ACLs are enabled.
   TEZ-1581. GroupByOrderByMRRTest no longer functional.
   TEZ-1157. Optimize broadcast shuffle to download data only once per host. 
+  TEZ-1607. support mr envs in mrrsleep and testorderedwordcount
 
 Release 0.5.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/09bd44e7/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index de8a1ad..bbb4d64 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -524,16 +524,21 @@ public class MRRSleepJob extends Configured implements Tool {
     UserPayload mapUserPayload = TezUtils.createUserPayloadFromConf(mapStageConf);
     int numTasks = generateSplitsInAM ? -1 : numMapper;
 
+    Map<String, String> mapEnv = Maps.newHashMap();
+    MRHelpers.updateEnvBasedOnMRTaskEnv(mapStageConf, mapEnv, true);
+    Map<String, String> reduceEnv = Maps.newHashMap();
+    MRHelpers.updateEnvBasedOnMRTaskEnv(mapStageConf, reduceEnv, false);
+
     Vertex mapVertex = Vertex.create("map", ProcessorDescriptor.create(
         MapProcessor.class.getName()).setUserPayload(mapUserPayload), numTasks,
         MRHelpers.getResourceForMRMapper(mapStageConf));
     mapVertex.addTaskLocalFiles(commonLocalResources)
         .addDataSource("MRInput", dataSource)
-        .setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper(mapStageConf));
+        .setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper(mapStageConf)).setTaskEnvironment(mapEnv);
     vertices.add(mapVertex);
 
     if (iReduceStagesCount > 0
-        && numIReducer > 0) {
+        && numIReducer > 0) {      
       for (int i = 0; i < iReduceStagesCount; ++i) {
         Configuration iconf =
             intermediateReduceStageConfs[i];
@@ -544,7 +549,7 @@ public class MRRSleepJob extends Configured implements Tool {
             MRHelpers.getResourceForMRReducer(intermediateReduceStageConfs[i]));
         ivertex.addTaskLocalFiles(commonLocalResources)
             .setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(
-                intermediateReduceStageConfs[i]));
+                intermediateReduceStageConfs[i])).setTaskEnvironment(reduceEnv);
         vertices.add(ivertex);
       }
     }
@@ -558,7 +563,8 @@ public class MRRSleepJob extends Configured implements Tool {
       finalReduceVertex.addTaskLocalFiles(commonLocalResources)
           .addDataSink("MROutput", MROutputLegacy.createConfigBuilder(
               finalReduceConf, NullOutputFormat.class).build())
-          .setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(finalReduceConf));
+          .setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(finalReduceConf))
+          .setTaskEnvironment(reduceEnv);
       vertices.add(finalReduceVertex);
     } else {
       // Map only job

http://git-wip-us.apache.org/repos/asf/tez/blob/09bd44e7/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index a36d1d2..e519085 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -79,6 +79,7 @@ import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
 
 /**
  * An MRR job built on top of word count to return words sorted by
@@ -198,6 +199,11 @@ public class TestOrderedWordCount extends Configured implements Tool {
       dsd = MRInputLegacy.createConfigBuilder(mapStageConf, TextInputFormat.class, inputPath).build();
     }
 
+    Map<String, String> mapEnv = Maps.newHashMap();
+    MRHelpers.updateEnvBasedOnMRTaskEnv(mapStageConf, mapEnv, true);
+    Map<String, String> reduceEnv = Maps.newHashMap();
+    MRHelpers.updateEnvBasedOnMRTaskEnv(mapStageConf, reduceEnv, false);
+
     Vertex mapVertex;
     ProcessorDescriptor mapProcessorDescriptor =
         ProcessorDescriptor.create(MapProcessor.class.getName())
@@ -210,6 +216,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
       mapVertex = Vertex.create("initialmap", mapProcessorDescriptor, -1,
           MRHelpers.getResourceForMRMapper(mapStageConf));
       mapVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper(mapStageConf));
+      mapVertex.setTaskEnvironment(mapEnv);
     }
     mapVertex.addTaskLocalFiles(commonLocalResources)
         .addDataSource("MRInput", dsd);
@@ -232,6 +239,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
       intermediateVertex = Vertex.create("intermediate_reducer", iReduceProcessorDescriptor,
           intermediateNumReduceTasks, MRHelpers.getResourceForMRReducer(iReduceStageConf));
       intermediateVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(iReduceStageConf));
+      intermediateVertex.setTaskEnvironment(reduceEnv);
     }
     intermediateVertex.addTaskLocalFiles(commonLocalResources);
     vertices.add(intermediateVertex);
@@ -253,6 +261,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
       finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1,
           MRHelpers.getResourceForMRReducer(finalReduceConf));
       finalReduceVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(finalReduceConf));
+      finalReduceVertex.setTaskEnvironment(reduceEnv);
     }
     finalReduceVertex.addTaskLocalFiles(commonLocalResources);
     finalReduceVertex.addDataSink("MROutput",