You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:22 UTC
[16/50] [abbrv] git commit: TEZ-1580. Change TestOrderedWordCount to
optionally use MR configs. (hitesh)
TEZ-1580. Change TestOrderedWordCount to optionally use MR configs. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9dd0cb4d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9dd0cb4d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9dd0cb4d
Branch: refs/heads/branch-0.5
Commit: 9dd0cb4d8e933d5f57b3d9ae532e7167978aed68
Parents: 5e5683a
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Sep 12 14:25:17 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Sep 12 14:25:17 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../examples/TestOrderedWordCount.java | 61 +++++++++++++++-----
2 files changed, 49 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9dd0cb4d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 73a3671..f71c2e2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@ ALL CHANGES:
TEZ-1575. MRRSleepJob does not pick MR settings for container size and java opts.
TEZ-1578. Remove TeraSort from Tez codebase.
TEZ-1569. Add tests for preemption
+ TEZ-1580. Change TestOrderedWordCount to optionally use MR configs.
Release 0.5.1: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/9dd0cb4d/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 2c5db10..a36d1d2 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -149,7 +149,9 @@ public class TestOrderedWordCount extends Configured implements Tool {
public DAG createDAG(FileSystem fs, Configuration conf,
Map<String, LocalResource> commonLocalResources, Path stagingDir,
int dagIndex, String inputPath, String outputPath,
- boolean generateSplitsInClient) throws Exception {
+ boolean generateSplitsInClient,
+ boolean useMRSettings,
+ int intermediateNumReduceTasks) throws Exception {
Configuration mapStageConf = new JobConf(conf);
mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
@@ -196,32 +198,62 @@ public class TestOrderedWordCount extends Configured implements Tool {
dsd = MRInputLegacy.createConfigBuilder(mapStageConf, TextInputFormat.class, inputPath).build();
}
- Vertex mapVertex = Vertex.create("initialmap", ProcessorDescriptor.create(
- MapProcessor.class.getName()).setUserPayload(
- TezUtils.createUserPayloadFromConf(mapStageConf))
- .setHistoryText(mapStageHistoryText)).addTaskLocalFiles(commonLocalResources);
- mapVertex.addDataSource("MRInput", dsd);
+ Vertex mapVertex;
+ ProcessorDescriptor mapProcessorDescriptor =
+ ProcessorDescriptor.create(MapProcessor.class.getName())
+ .setUserPayload(
+ TezUtils.createUserPayloadFromConf(mapStageConf))
+ .setHistoryText(mapStageHistoryText);
+ if (!useMRSettings) {
+ mapVertex = Vertex.create("initialmap", mapProcessorDescriptor);
+ } else {
+ mapVertex = Vertex.create("initialmap", mapProcessorDescriptor, -1,
+ MRHelpers.getResourceForMRMapper(mapStageConf));
+ mapVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper(mapStageConf));
+ }
+ mapVertex.addTaskLocalFiles(commonLocalResources)
+ .addDataSource("MRInput", dsd);
vertices.add(mapVertex);
ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
iReduceStageConf.writeXml(iROutputStream);
String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
- Vertex ivertex = Vertex.create("intermediate_reducer", ProcessorDescriptor.create(
+
+ ProcessorDescriptor iReduceProcessorDescriptor = ProcessorDescriptor.create(
ReduceProcessor.class.getName())
.setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
- .setHistoryText(iReduceStageHistoryText), 2);
- ivertex.addTaskLocalFiles(commonLocalResources);
- vertices.add(ivertex);
+ .setHistoryText(iReduceStageHistoryText);
+
+ Vertex intermediateVertex;
+ if (!useMRSettings) {
+ intermediateVertex = Vertex.create("intermediate_reducer", iReduceProcessorDescriptor,
+ intermediateNumReduceTasks);
+ } else {
+ intermediateVertex = Vertex.create("intermediate_reducer", iReduceProcessorDescriptor,
+ intermediateNumReduceTasks, MRHelpers.getResourceForMRReducer(iReduceStageConf));
+ intermediateVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(iReduceStageConf));
+ }
+ intermediateVertex.addTaskLocalFiles(commonLocalResources);
+ vertices.add(intermediateVertex);
ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
finalReduceConf.writeXml(finalReduceOutputStream);
String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
- Vertex finalReduceVertex = Vertex.create("finalreduce",
+ Vertex finalReduceVertex;
+
+ ProcessorDescriptor finalReduceProcessorDescriptor =
ProcessorDescriptor.create(
ReduceProcessor.class.getName())
.setUserPayload(finalReducePayload)
- .setHistoryText(finalReduceStageHistoryText), 1);
+ .setHistoryText(finalReduceStageHistoryText);
+ if (!useMRSettings) {
+ finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1);
+ } else {
+ finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1,
+ MRHelpers.getResourceForMRReducer(finalReduceConf));
+ finalReduceVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(finalReduceConf));
+ }
finalReduceVertex.addTaskLocalFiles(commonLocalResources);
finalReduceVertex.addDataSink("MROutput",
MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath)
@@ -283,6 +315,9 @@ public class TestOrderedWordCount extends Configured implements Tool {
* 1000;
boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false);
+ boolean useMRSettings = conf.getBoolean("USE_MR_CONFIGS", true);
+ // TODO needs to use auto reduce parallelism
+ int intermediateNumReduceTasks = conf.getInt("IREDUCE_NUM_TASKS", 2);
if (((otherArgs.length%2) != 0)
|| (!useTezSession && otherArgs.length != 2)) {
@@ -371,7 +406,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
DAG dag = instance.createDAG(fs, conf, localResources,
stagingDir, dagIndex, inputPath, outputPath,
- generateSplitsInClient);
+ generateSplitsInClient, useMRSettings, intermediateNumReduceTasks);
boolean doPreWarm = dagIndex == 1 && useTezSession
&& conf.getBoolean("PRE_WARM_SESSION", true);