You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/12/28 18:13:21 UTC
git commit: TEZ-689. Write wordcount in Tez API (bikas)
Updated Branches:
refs/heads/master 97e9a5ef3 -> 0134ebaa7
TEZ-689. Write wordcount in Tez API (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/0134ebaa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/0134ebaa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/0134ebaa
Branch: refs/heads/master
Commit: 0134ebaa780beedb9fc218fa13ec911cc73e8e17
Parents: 97e9a5e
Author: Bikas Saha <bi...@apache.org>
Authored: Sat Dec 28 09:12:47 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Sat Dec 28 09:12:47 2013 -0800
----------------------------------------------------------------------
.../tez/mapreduce/examples/WordCount.java | 385 +++++++++++++++++--
1 file changed, 345 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0134ebaa/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index fc1103e..bb48e3f 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -15,73 +15,378 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.tez.mapreduce.examples;
import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.StringTokenizer;
+import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+
+import com.google.common.base.Preconditions;
public class WordCount {
+ public static class TokenProcessor implements LogicalIOProcessor {
+ TezProcessorContext context;
+ IntWritable one = new IntWritable(1);
+ Text word = new Text();
- public static class TokenizerMapper
- extends Mapper<Object, Text, Text, IntWritable>{
+ @Override
+ public void initialize(TezProcessorContext processorContext)
+ throws Exception {
+ this.context = processorContext;
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+ }
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
+ @Override
+ public void close() throws Exception {
+ }
- public void map(Object key, Text value, Context context
- ) throws IOException, InterruptedException {
- StringTokenizer itr = new StringTokenizer(value.toString());
- while (itr.hasMoreTokens()) {
- word.set(itr.nextToken());
- context.write(word, one);
+ @Override
+ public void run(Map<String, LogicalInput> inputs,
+ Map<String, LogicalOutput> outputs) throws Exception {
+ Preconditions.checkArgument(inputs.size() == 1);
+ Preconditions.checkArgument(outputs.size() == 1);
+ MRInput input = (MRInput) inputs.values().iterator().next();
+ KeyValueReader kvReader = input.getReader();
+ OnFileSortedOutput output = (OnFileSortedOutput) outputs.values().iterator().next();
+ KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
+ while (kvReader.next()) {
+ StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ kvWriter.write(word, one);
+ }
}
}
+
}
+
+ public static class SumProcessor implements LogicalIOProcessor {
+ TezProcessorContext context;
+
+ @Override
+ public void initialize(TezProcessorContext processorContext)
+ throws Exception {
+ this.context = processorContext;
+ }
- public static class IntSumReducer
- extends Reducer<Text,IntWritable,Text,IntWritable> {
- private IntWritable result = new IntWritable();
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
- public void reduce(Text key, Iterable<IntWritable> values,
- Context context
- ) throws IOException, InterruptedException {
- int sum = 0;
- for (IntWritable val : values) {
- sum += val.get();
+ @Override
+ public void run(Map<String, LogicalInput> inputs,
+ Map<String, LogicalOutput> outputs) throws Exception {
+ Preconditions.checkArgument(inputs.size() == 1);
+ MROutput out = (MROutput) outputs.values().iterator().next();
+ KeyValueWriter kvWriter = out.getWriter();
+ KeyValuesReader kvReader = (KeyValuesReader) inputs.values().iterator().next().getReader();
+ while (kvReader.next()) {
+ Text word = (Text) kvReader.getCurrentKey();
+ int sum = 0;
+ for (Object value : kvReader.getCurrentValues()) {
+ sum += ((IntWritable) value).get();
+ }
+ kvWriter.write(word, new IntWritable(sum));
+ }
+ if (out.isCommitRequired()) {
+ while (!context.canCommit()) {
+ Thread.sleep(100);
+ }
+ out.commit();
}
- result.set(sum);
- context.write(key, result);
+ }
+
+ }
+
+ private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
+ Map<String, LocalResource> localResources, Path stagingDir,
+ String inputPath, String outputPath) throws IOException {
+ Configuration mapStageConf = new JobConf((Configuration)tezConf);
+ mapStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+ Text.class.getName());
+ mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+ IntWritable.class.getName());
+ mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+ TezGroupedSplitsInputFormat.class.getName());
+
+ mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
+ mapStageConf.setBoolean("mapred.mapper.new-api", true);
+
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf,
+ null);
+
+ Configuration finalReduceConf = new JobConf((Configuration)tezConf);
+ finalReduceConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+ Text.class.getName());
+ finalReduceConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+ IntWritable.class.getName());
+ finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+ TextOutputFormat.class.getName());
+ finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
+ finalReduceConf.setBoolean("mapred.mapper.new-api", true);
+
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
+ mapStageConf);
+
+ MRHelpers.doJobClientMagic(mapStageConf);
+ MRHelpers.doJobClientMagic(finalReduceConf);
+
+ byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
+ byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
+ TextInputFormat.class.getName());
+ int numMaps = -1;
+ Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor(
+ TokenProcessor.class.getName()),
+ numMaps, MRHelpers.getMapResource(mapStageConf));
+ tokenizerVertex.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
+ Map<String, String> mapEnv = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
+ tokenizerVertex.setTaskEnvironment(mapEnv);
+ Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
+ InputDescriptor id = new InputDescriptor(MRInput.class.getName()).
+ setUserPayload(mapInputPayload);
+ tokenizerVertex.addInput("MRInput", id, initializerClazz);
+
+ byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+ Vertex summerVertex = new Vertex("summer",
+ new ProcessorDescriptor(
+ SumProcessor.class.getName()).setUserPayload(finalReducePayload),
+ 1, MRHelpers.getReduceResource(finalReduceConf));
+ summerVertex.setJavaOpts(
+ MRHelpers.getReduceJavaOpts(finalReduceConf));
+ Map<String, String> reduceEnv = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
+ summerVertex.setTaskEnvironment(reduceEnv);
+ OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
+ .setUserPayload(finalReducePayload);
+ summerVertex.addOutput("MROutput", od);
+
+ DAG dag = new DAG("WordCount");
+ dag.addVertex(tokenizerVertex)
+ .addVertex(summerVertex)
+ .addEdge(
+ new Edge(tokenizerVertex, summerVertex, new EdgeProperty(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(OnFileSortedOutput.class.getName())
+ .setUserPayload(mapPayload),
+ new InputDescriptor(ShuffledMergedInput.class.getName())
+ .setUserPayload(finalReducePayload))));
+ return dag;
+ }
+
+ private static void waitForTezSessionReady(TezSession tezSession)
+ throws IOException, TezException {
+ while (true) {
+ TezSessionStatus status = tezSession.getSessionStatus();
+ if (status.equals(TezSessionStatus.SHUTDOWN)) {
+ throw new RuntimeException("TezSession has already shutdown");
+ }
+ if (status.equals(TezSessionStatus.READY)) {
+ return;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+
+ private static void printUsage() {
+ System.err.println("Usage: " + " wordcount <in1> <out1>");
+ }
+
+ private Credentials credentials = new Credentials();
+
+ public boolean run(String inputPath, String outputPath, Configuration conf) throws Exception {
+ System.out.println("Running WordCount");
+ // conf and UGI
+ TezConfiguration tezConf;
+ if (conf != null) {
+ tezConf = new TezConfiguration(conf);
+ } else {
+ tezConf = new TezConfiguration();
+ }
+ UserGroupInformation.setConfiguration(tezConf);
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+
+ TezClient tezClient = new TezClient(tezConf);
+ ApplicationId appId = tezClient.createApplication();
+
+ // staging dir
+ FileSystem fs = FileSystem.get(tezConf);
+ String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
+ + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
+ + Path.SEPARATOR + appId.toString();
+ Path stagingDir = new Path(stagingDirStr);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
+ stagingDir = fs.makeQualified(stagingDir);
+
+ // security
+ TokenCache.obtainTokensForNamenodes(credentials, new Path[] {stagingDir}, tezConf);
+ TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+
+ tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
+ MRHelpers.getMRAMJavaOpts(tezConf));
+
+ // No need to add jar containing this class as assumed to be part of
+ // the tez jars.
+
+ // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
+ // is the same filesystem as the one used for Input/Output.
+
+ TezSession tezSession = null;
+ AMConfiguration amConfig = new AMConfiguration(null,
+ null, tezConf, credentials);
+
+ TezSessionConfiguration sessionConfig =
+ new TezSessionConfiguration(amConfig, tezConf);
+ tezSession = new TezSession("WordCountSession", appId,
+ sessionConfig);
+ tezSession.start();
+
+ DAGStatus dagStatus = null;
+ DAGClient dagClient = null;
+ String[] vNames = { "tokenizer", "summer" };
+
+ Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+ try {
+ if (fs.exists(new Path(outputPath))) {
+ throw new FileAlreadyExistsException("Output directory "
+ + outputPath + " already exists");
+ }
+
+ Map<String, LocalResource> localResources =
+ new TreeMap<String, LocalResource>();
+
+ DAG dag = createDAG(fs, tezConf, localResources,
+ stagingDir, inputPath, outputPath);
+
+ waitForTezSessionReady(tezSession);
+ dagClient = tezSession.submitDAG(dag);
+ //dagClient = tezClient.submitDAGApplication(dag, amConfig);
+
+ // monitoring
+ while (true) {
+ dagStatus = dagClient.getDAGStatus(statusGetOpts);
+ if(dagStatus.getState() == DAGStatus.State.RUNNING ||
+ dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+ dagStatus.getState() == DAGStatus.State.FAILED ||
+ dagStatus.getState() == DAGStatus.State.KILLED ||
+ dagStatus.getState() == DAGStatus.State.ERROR) {
+ break;
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // continue;
+ }
+ }
+
+
+ while (dagStatus.getState() == DAGStatus.State.RUNNING) {
+ try {
+ ExampleDriver.printDAGStatus(dagClient, vNames);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // continue;
+ }
+ dagStatus = dagClient.getDAGStatus(statusGetOpts);
+ } catch (TezException e) {
+ System.exit(-1);
+ }
+ }
+ ExampleDriver.printDAGStatus(dagClient, vNames,
+ true, true);
+ System.out.println("DAG completed. " + "FinalState=" + dagStatus.getState());
+ if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+ System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
+ return false;
+ }
+ return true;
+ } finally {
+ fs.delete(stagingDir, true);
+ tezSession.stop();
}
}
public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println("Usage: wordcount <in> <out>");
+ if ((args.length%2) != 0) {
+ printUsage();
System.exit(2);
}
- Job job = new Job(conf, "word count");
- job.setJarByClass(WordCount.class);
- job.setMapperClass(TokenizerMapper.class);
- job.setCombinerClass(IntSumReducer.class);
- job.setReducerClass(IntSumReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
+ WordCount job = new WordCount();
+ job.run(args[0], args[1], null);
}
+
}