You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:24 UTC
[18/50] [abbrv] git commit: TEZ-1581. GroupByOrderByMRRTest no longer
functional. (hitesh)
TEZ-1581. GroupByOrderByMRRTest no longer functional. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7d1303fa
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7d1303fa
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7d1303fa
Branch: refs/heads/branch-0.5
Commit: 7d1303fa606e700bd4d4b2a122a73a5badbbc889
Parents: edb841c
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Sep 12 15:18:05 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Sep 12 15:18:05 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../examples/GroupByOrderByMRRTest.java | 283 +++++++++++++------
2 files changed, 205 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/7d1303fa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59be260..3198323 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@ ALL CHANGES:
TEZ-1569. Add tests for preemption
TEZ-1580. Change TestOrderedWordCount to optionally use MR configs.
TEZ-1524. Resolve user group information only if ACLs are enabled.
+ TEZ-1581. GroupByOrderByMRRTest no longer functional.
Release 0.5.1: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/7d1303fa/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
index 939bea0..393faea 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
@@ -18,34 +18,54 @@
package org.apache.tez.mapreduce.examples;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.StringTokenizer;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.client.MRTezClient;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
/**
* Simple example that does a GROUP BY ORDER BY in an MRR job
@@ -94,7 +114,7 @@ public class GroupByOrderByMRRTest extends Configured implements Tool {
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
- String empName = "";
+ String empName;
String deptName = "";
if (itr.hasMoreTokens()) {
empName = itr.nextToken();
@@ -149,25 +169,141 @@ public class GroupByOrderByMRRTest extends Configured implements Tool {
}
}
+ private static DAG createDAG(Configuration conf, Map<String, LocalResource> commonLocalResources,
+ Path stagingDir, String inputPath, String outputPath, boolean useMRSettings)
+ throws Exception {
+
+ Configuration mapStageConf = new JobConf(conf);
+ mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
+ MyMapper.class.getName());
+
+ MRHelpers.translateMRConfToTez(mapStageConf);
+
+ Configuration iReduceStageConf = new JobConf(conf);
+ // TODO replace with auto-reduce parallelism
+ iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2);
+ iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
+ MyGroupByReducer.class.getName());
+ iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+ iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
+ IntWritable.class.getName());
+ iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
+ MRHelpers.translateMRConfToTez(iReduceStageConf);
+
+ Configuration finalReduceConf = new JobConf(conf);
+ finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
+ finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
+ MyOrderByNoOpReducer.class.getName());
+ finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+ finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+ MRHelpers.translateMRConfToTez(finalReduceConf);
+
+ MRHelpers.configureMRApiUsage(mapStageConf);
+ MRHelpers.configureMRApiUsage(iReduceStageConf);
+ MRHelpers.configureMRApiUsage(finalReduceConf);
+
+ List<Vertex> vertices = new ArrayList<Vertex>();
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
+ mapStageConf.writeXml(outputStream);
+ String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
+ mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+ TextInputFormat.class.getName());
+ mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
+ mapStageConf.setBoolean("mapred.mapper.new-api", true);
+ DataSourceDescriptor dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(
+ mapStageConf, stagingDir, true);
+
+ Vertex mapVertex;
+ ProcessorDescriptor mapProcessorDescriptor =
+ ProcessorDescriptor.create(MapProcessor.class.getName())
+ .setUserPayload(
+ TezUtils.createUserPayloadFromConf(mapStageConf))
+ .setHistoryText(mapStageHistoryText);
+ if (!useMRSettings) {
+ mapVertex = Vertex.create("initialmap", mapProcessorDescriptor);
+ } else {
+ mapVertex = Vertex.create("initialmap", mapProcessorDescriptor, -1,
+ MRHelpers.getResourceForMRMapper(mapStageConf));
+ mapVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper(mapStageConf));
+ }
+ mapVertex.addTaskLocalFiles(commonLocalResources)
+ .addDataSource("MRInput", dsd);
+ vertices.add(mapVertex);
+
+ ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
+ iReduceStageConf.writeXml(iROutputStream);
+ String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
+
+ ProcessorDescriptor iReduceProcessorDescriptor = ProcessorDescriptor.create(
+ ReduceProcessor.class.getName())
+ .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
+ .setHistoryText(iReduceStageHistoryText);
+
+ Vertex intermediateVertex;
+ if (!useMRSettings) {
+ intermediateVertex = Vertex.create("ireduce1", iReduceProcessorDescriptor, 1);
+ } else {
+ intermediateVertex = Vertex.create("ireduce1", iReduceProcessorDescriptor,
+ 1, MRHelpers.getResourceForMRReducer(iReduceStageConf));
+ intermediateVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(iReduceStageConf));
+ }
+ intermediateVertex.addTaskLocalFiles(commonLocalResources);
+ vertices.add(intermediateVertex);
+
+ ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
+ finalReduceConf.writeXml(finalReduceOutputStream);
+ String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
+ UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
+ Vertex finalReduceVertex;
+
+ ProcessorDescriptor finalReduceProcessorDescriptor =
+ ProcessorDescriptor.create(
+ ReduceProcessor.class.getName())
+ .setUserPayload(finalReducePayload)
+ .setHistoryText(finalReduceStageHistoryText);
+ if (!useMRSettings) {
+ finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1);
+ } else {
+ finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1,
+ MRHelpers.getResourceForMRReducer(finalReduceConf));
+ finalReduceVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(finalReduceConf));
+ }
+ finalReduceVertex.addTaskLocalFiles(commonLocalResources);
+ finalReduceVertex.addDataSink("MROutput",
+ MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath)
+ .build());
+ vertices.add(finalReduceVertex);
+
+ DAG dag = DAG.create("groupbyorderbymrrtest");
+ for (Vertex v : vertices) {
+ dag.addVertex(v);
+ }
+
+ OrderedPartitionedKVEdgeConfig edgeConf1 = OrderedPartitionedKVEdgeConfig
+ .newBuilder(Text.class.getName(), IntWritable.class.getName(),
+ HashPartitioner.class.getName()).setFromConfiguration(conf)
+ .configureInput().useLegacyInput().done().build();
+ dag.addEdge(
+ Edge.create(dag.getVertex("initialmap"), dag.getVertex("ireduce1"),
+ edgeConf1.createDefaultEdgeProperty()));
+
+ OrderedPartitionedKVEdgeConfig edgeConf2 = OrderedPartitionedKVEdgeConfig
+ .newBuilder(IntWritable.class.getName(), Text.class.getName(),
+ HashPartitioner.class.getName()).setFromConfiguration(conf)
+ .configureInput().useLegacyInput().done().build();
+ dag.addEdge(
+ Edge.create(dag.getVertex("ireduce1"), dag.getVertex("finalreduce"),
+ edgeConf2.createDefaultEdgeProperty()));
+
+ return dag;
+ }
+
+
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
- // Configure intermediate reduces
- conf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1);
-
- // Set reducer class for intermediate reduce
- conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
- "mapreduce.job.reduce.class"), MyGroupByReducer.class, Reducer.class);
- // Set reducer output key class
- conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
- "mapreduce.map.output.key.class"), IntWritable.class, Object.class);
- // Set reducer output value class
- conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
- "mapreduce.map.output.value.class"), Text.class, Object.class);
- conf.setInt(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
- "mapreduce.job.reduces"), 2);
-
String[] otherArgs = new GenericOptionsParser(conf, args).
getRemainingArgs();
if (otherArgs.length != 2) {
@@ -176,66 +312,55 @@ public class GroupByOrderByMRRTest extends Configured implements Tool {
return 2;
}
- @SuppressWarnings("deprecation")
- Job job = new Job(conf, "groupbyorderbymrrtest");
-
- job.setJarByClass(GroupByOrderByMRRTest.class);
-
- // Configure map
- job.setMapperClass(MyMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
-
- // Configure reduce
- job.setReducerClass(MyOrderByNoOpReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- job.setNumReduceTasks(1);
-
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-
- job.submit();
- JobID jobId = job.getJobID();
- ApplicationId appId = TypeConverter.toYarn(jobId).getAppId();
-
- DAGClient dagClient = MRTezClient.getDAGClient(appId, new TezConfiguration(conf), null);
- DAGStatus dagStatus;
- String[] vNames = { "initialmap" , "ireduce1" , "finalreduce" };
- while (true) {
- dagStatus = dagClient.getDAGStatus(null);
- if(dagStatus.getState() == DAGStatus.State.RUNNING ||
- dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
- dagStatus.getState() == DAGStatus.State.FAILED ||
- dagStatus.getState() == DAGStatus.State.KILLED ||
- dagStatus.getState() == DAGStatus.State.ERROR) {
- break;
- }
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- // continue;
- }
+ String inputPath = otherArgs[0];
+ String outputPath = otherArgs[1];
+
+ UserGroupInformation.setConfiguration(conf);
+
+ TezConfiguration tezConf = new TezConfiguration(conf);
+ FileSystem fs = FileSystem.get(conf);
+
+ if (fs.exists(new Path(outputPath))) {
+ throw new FileAlreadyExistsException("Output directory "
+ + outputPath + " already exists");
}
- while (dagStatus.getState() == DAGStatus.State.RUNNING) {
- try {
- ExampleDriver.printDAGStatus(dagClient, vNames);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // continue;
- }
- dagStatus = dagClient.getDAGStatus(null);
- } catch (TezException e) {
- LOG.fatal("Failed to get application progress. Exiting");
+ Map<String, LocalResource> localResources =
+ new TreeMap<String, LocalResource>();
+
+ String stagingDirStr = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
+ TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + Path.SEPARATOR +
+ Long.toString(System.currentTimeMillis());
+ Path stagingDir = new Path(stagingDirStr);
+ FileSystem pathFs = stagingDir.getFileSystem(tezConf);
+ pathFs.mkdirs(new Path(stagingDirStr));
+
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
+ stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
+
+ TezClient tezClient = TezClient.create("groupbyorderbymrrtest", tezConf);
+ tezClient.start();
+
+ LOG.info("Submitting groupbyorderbymrrtest DAG as a new Tez Application");
+
+ try {
+ DAG dag = createDAG(conf, localResources, stagingDir, inputPath, outputPath, true);
+
+ tezClient.waitTillReady();
+
+ DAGClient dagClient = tezClient.submitDAG(dag);
+
+ DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
+ if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+ LOG.error("groupbyorderbymrrtest failed, state=" + dagStatus.getState()
+ + ", diagnostics=" + dagStatus.getDiagnostics());
return -1;
}
+ LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
+ return 0;
+ } finally {
+ tezClient.stop();
}
-
- ExampleDriver.printDAGStatus(dagClient, vNames);
- LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
- return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
}
public static void main(String[] args) throws Exception {