You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/05/13 08:09:10 UTC
tez git commit: TEZ-2351. Remove GroupByOrderbyMRRTest example from
tez-tests (Hitesh via Jeff Zhang)
Repository: tez
Updated Branches:
refs/heads/master faffae43c -> 7d0ae69e2
TEZ-2351. Remove GroupByOrderbyMRRTest example from tez-tests (Hitesh via Jeff Zhang)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7d0ae69e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7d0ae69e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7d0ae69e
Branch: refs/heads/master
Commit: 7d0ae69e2aff1e76faa5c89a3789e35ac254035c
Parents: faffae4
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed May 13 14:08:24 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed May 13 14:08:24 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/mapreduce/examples/ExampleDriver.java | 11 +-
.../examples/GroupByOrderByMRRTest.java | 372 -------------------
3 files changed, 5 insertions(+), 379 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/7d0ae69e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c52a6e..24a862d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2351. Remove GroupByOrderbyMRRTest example from tez-tests.
TEZ-2419. Inputs/Outputs should inform the Processor about Interrupts when interrupted during a blocking Op.
TEZ-1752. Inputs / Outputs in the Runtime library should be interruptable.
TEZ-1970. Fix javadoc warnings in SortMergeJoinExample.
http://git-wip-us.apache.org/repos/asf/tez/blob/7d0ae69e/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index d1bc46c..90d2825 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -46,8 +46,10 @@ public class ExampleDriver {
int exitCode = -1;
ProgramDriver pgd = new ProgramDriver();
try {
- pgd.addClass("broadcastloadgen", BroadcastLoadGen.class, "Run a DAG to generate load for Broadcast Shuffle");
- pgd.addClass("rpcloadgen", RPCLoadGen.class, "Run a DAG to generate load for the task to AM RPC");
+ pgd.addClass("broadcastloadgen", BroadcastLoadGen.class,
+ "Run a DAG to generate load for Broadcast Shuffle");
+ pgd.addClass("rpcloadgen", RPCLoadGen.class,
+ "Run a DAG to generate load for the task to AM RPC");
pgd.addClass("wordcount", MapredWordCount.class,
"A map/reduce program that counts the words in the input files.");
pgd.addClass("mapredwordcount", MapredWordCount.class,
@@ -65,11 +67,6 @@ public class ExampleDriver {
pgd.addClass("join", Join.class,
"A job that effects a join over sorted, equally partitioned"
+ " datasets");
- pgd.addClass("groupbyorderbymrrtest", GroupByOrderByMRRTest.class,
- "A map-reduce-reduce program that does groupby-order by. Takes input"
- + " containing employee_name department name per line of input"
- + " and generates count of employees per department and"
- + " sorted on employee count");
pgd.addClass("mrrsleep", MRRSleepJob.class,
"MRR Sleep Job");
pgd.addClass("testorderedwordcount", TestOrderedWordCount.class,
http://git-wip-us.apache.org/repos/asf/tez/blob/7d0ae69e/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
deleted file mode 100644
index 8e88b14..0000000
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.TreeMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.output.MROutputLegacy;
-import org.apache.tez.mapreduce.processor.map.MapProcessor;
-import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-
-/**
- * Simple example that does a GROUP BY ORDER BY in an MRR job
- * Consider a query such as
- * Select DeptName, COUNT(*) as cnt FROM EmployeeTable
- * GROUP BY DeptName ORDER BY cnt;
- *
- * i.e. List all departments with count of employees in each department
- * and ordered based on department's employee count.
- *
- * Requires an Input file containing 2 strings per line in format of
- * <EmployeeName> <DeptName>
- *
- * For example, use the following:
- *
- * #/bin/bash
- *
- * i=1000000
- * j=1000
- *
- * id=0
- * while [[ "$id" -ne "$i" ]]
- * do
- * id=`expr $id + 1`
- * deptId=`expr $RANDOM % $j + 1`
- * deptName=`echo "ibase=10;obase=16;$deptId" | bc`
- * echo "$id O$deptName"
- * done
- *
- */
-public class GroupByOrderByMRRTest extends Configured implements Tool {
-
- private static final Logger LOG = LoggerFactory.getLogger(GroupByOrderByMRRTest.class);
-
- /**
- * Mapper takes in a single line as input containing
- * employee name and department name and then
- * emits department name with count of 1
- */
- public static class MyMapper
- extends Mapper<Object, Text, Text, IntWritable> {
-
- private final static IntWritable one = new IntWritable(1);
- private final static Text word = new Text();
-
- public void map(Object key, Text value, Context context
- ) throws IOException, InterruptedException {
- StringTokenizer itr = new StringTokenizer(value.toString());
- String empName;
- String deptName = "";
- if (itr.hasMoreTokens()) {
- empName = itr.nextToken();
- if (itr.hasMoreTokens()) {
- deptName = itr.nextToken();
- }
- if (!empName.isEmpty()
- && !deptName.isEmpty()) {
- word.set(deptName);
- context.write(word, one);
- }
- }
- }
- }
-
- /**
- * Intermediate reducer aggregates the total count per department.
- * It takes department name and count as input and emits the final
- * count per department name.
- */
- public static class MyGroupByReducer
- extends Reducer<Text, IntWritable, IntWritable, Text> {
- private IntWritable result = new IntWritable();
-
- public void reduce(Text key, Iterable<IntWritable> values,
- Context context
- ) throws IOException, InterruptedException {
-
- int sum = 0;
- for (IntWritable val : values) {
- sum += val.get();
- }
- result.set(sum);
- context.write(result, key);
- }
- }
-
- /**
- * Shuffle ensures ordering based on count of employees per department
- * hence the final reducer is a no-op and just emits the department name
- * with the employee count per department.
- */
- public static class MyOrderByNoOpReducer
- extends Reducer<IntWritable, Text, Text, IntWritable> {
-
- public void reduce(IntWritable key, Iterable<Text> values,
- Context context
- ) throws IOException, InterruptedException {
- for (Text word : values) {
- context.write(word, key);
- }
- }
- }
-
- private static DAG createDAG(Configuration conf, Map<String, LocalResource> commonLocalResources,
- Path stagingDir, String inputPath, String outputPath, boolean useMRSettings)
- throws Exception {
-
- Configuration mapStageConf = new JobConf(conf);
- mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
- MyMapper.class.getName());
-
- MRHelpers.translateMRConfToTez(mapStageConf);
-
- Configuration iReduceStageConf = new JobConf(conf);
- // TODO replace with auto-reduce parallelism
- iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2);
- iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
- MyGroupByReducer.class.getName());
- iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
- iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
- IntWritable.class.getName());
- iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
- MRHelpers.translateMRConfToTez(iReduceStageConf);
-
- Configuration finalReduceConf = new JobConf(conf);
- finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
- finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
- MyOrderByNoOpReducer.class.getName());
- finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
- finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
- MRHelpers.translateMRConfToTez(finalReduceConf);
-
- MRHelpers.configureMRApiUsage(mapStageConf);
- MRHelpers.configureMRApiUsage(iReduceStageConf);
- MRHelpers.configureMRApiUsage(finalReduceConf);
-
- List<Vertex> vertices = new ArrayList<Vertex>();
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
- mapStageConf.writeXml(outputStream);
- String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
- mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
- TextInputFormat.class.getName());
- mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
- mapStageConf.setBoolean("mapred.mapper.new-api", true);
- DataSourceDescriptor dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(
- mapStageConf, stagingDir, true);
-
- Vertex mapVertex;
- ProcessorDescriptor mapProcessorDescriptor =
- ProcessorDescriptor.create(MapProcessor.class.getName())
- .setUserPayload(
- TezUtils.createUserPayloadFromConf(mapStageConf))
- .setHistoryText(mapStageHistoryText);
- if (!useMRSettings) {
- mapVertex = Vertex.create("initialmap", mapProcessorDescriptor);
- } else {
- mapVertex = Vertex.create("initialmap", mapProcessorDescriptor, -1,
- MRHelpers.getResourceForMRMapper(mapStageConf));
- mapVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper(mapStageConf));
- }
- mapVertex.addTaskLocalFiles(commonLocalResources)
- .addDataSource("MRInput", dsd);
- vertices.add(mapVertex);
-
- ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
- iReduceStageConf.writeXml(iROutputStream);
- String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
-
- ProcessorDescriptor iReduceProcessorDescriptor = ProcessorDescriptor.create(
- ReduceProcessor.class.getName())
- .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
- .setHistoryText(iReduceStageHistoryText);
-
- Vertex intermediateVertex;
- if (!useMRSettings) {
- intermediateVertex = Vertex.create("ireduce1", iReduceProcessorDescriptor, 1);
- } else {
- intermediateVertex = Vertex.create("ireduce1", iReduceProcessorDescriptor,
- 1, MRHelpers.getResourceForMRReducer(iReduceStageConf));
- intermediateVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(iReduceStageConf));
- }
- intermediateVertex.addTaskLocalFiles(commonLocalResources);
- vertices.add(intermediateVertex);
-
- ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
- finalReduceConf.writeXml(finalReduceOutputStream);
- String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
- UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
- Vertex finalReduceVertex;
-
- ProcessorDescriptor finalReduceProcessorDescriptor =
- ProcessorDescriptor.create(
- ReduceProcessor.class.getName())
- .setUserPayload(finalReducePayload)
- .setHistoryText(finalReduceStageHistoryText);
- if (!useMRSettings) {
- finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1);
- } else {
- finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1,
- MRHelpers.getResourceForMRReducer(finalReduceConf));
- finalReduceVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(finalReduceConf));
- }
- finalReduceVertex.addTaskLocalFiles(commonLocalResources);
- finalReduceVertex.addDataSink("MROutput",
- MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath)
- .build());
- vertices.add(finalReduceVertex);
-
- DAG dag = DAG.create("groupbyorderbymrrtest");
- for (Vertex v : vertices) {
- dag.addVertex(v);
- }
-
- OrderedPartitionedKVEdgeConfig edgeConf1 = OrderedPartitionedKVEdgeConfig
- .newBuilder(Text.class.getName(), IntWritable.class.getName(),
- HashPartitioner.class.getName()).setFromConfiguration(conf)
- .configureInput().useLegacyInput().done().build();
- dag.addEdge(
- Edge.create(dag.getVertex("initialmap"), dag.getVertex("ireduce1"),
- edgeConf1.createDefaultEdgeProperty()));
-
- OrderedPartitionedKVEdgeConfig edgeConf2 = OrderedPartitionedKVEdgeConfig
- .newBuilder(IntWritable.class.getName(), Text.class.getName(),
- HashPartitioner.class.getName()).setFromConfiguration(conf)
- .configureInput().useLegacyInput().done().build();
- dag.addEdge(
- Edge.create(dag.getVertex("ireduce1"), dag.getVertex("finalreduce"),
- edgeConf2.createDefaultEdgeProperty()));
-
- return dag;
- }
-
-
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf = getConf();
-
- String[] otherArgs = new GenericOptionsParser(conf, args).
- getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println("Usage: groupbyorderbymrrtest <in> <out>");
- ToolRunner.printGenericCommandUsage(System.err);
- return 2;
- }
-
- String inputPath = otherArgs[0];
- String outputPath = otherArgs[1];
-
- UserGroupInformation.setConfiguration(conf);
-
- TezConfiguration tezConf = new TezConfiguration(conf);
- FileSystem fs = FileSystem.get(conf);
-
- if (fs.exists(new Path(outputPath))) {
- throw new FileAlreadyExistsException("Output directory "
- + outputPath + " already exists");
- }
-
- Map<String, LocalResource> localResources =
- new TreeMap<String, LocalResource>();
-
- String stagingDirStr = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
- TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + Path.SEPARATOR +
- Long.toString(System.currentTimeMillis());
- Path stagingDir = new Path(stagingDirStr);
- FileSystem pathFs = stagingDir.getFileSystem(tezConf);
- pathFs.mkdirs(new Path(stagingDirStr));
-
- tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
- stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
-
- TezClient tezClient = TezClient.create("groupbyorderbymrrtest", tezConf);
- tezClient.start();
-
- LOG.info("Submitting groupbyorderbymrrtest DAG as a new Tez Application");
-
- try {
- DAG dag = createDAG(conf, localResources, stagingDir, inputPath, outputPath, true);
-
- tezClient.waitTillReady();
-
- DAGClient dagClient = tezClient.submitDAG(dag);
-
- DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
- if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
- LOG.error("groupbyorderbymrrtest failed, state=" + dagStatus.getState()
- + ", diagnostics=" + dagStatus.getDiagnostics());
- return -1;
- }
- LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
- return 0;
- } finally {
- tezClient.stop();
- }
- }
-
- public static void main(String[] args) throws Exception {
- Configuration configuration = new Configuration();
- GroupByOrderByMRRTest groupByOrderByMRRTest = new GroupByOrderByMRRTest();
- int status = ToolRunner.run(configuration, groupByOrderByMRRTest, args);
- System.exit(status);
- }
-}