You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/08/16 02:54:55 UTC
[07/10] TEZ-1055. Rename tez-mapreduce-examples to tez-examples
(Hitesh Shah via bikas)
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java
deleted file mode 100644
index c9b51c7..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * This program uses map/reduce to just run a distributed job where there is
- * no interaction between the tasks and each task write a large unsorted
- * random binary sequence file of BytesWritable.
- * In order for this program to generate data for terasort with 10-byte keys
- * and 90-byte values, have the following config:
- * <xmp>
- * <?xml version="1.0"?>
- * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
- * <configuration>
- * <property>
- * <name>mapreduce.randomwriter.minkey</name>
- * <value>10</value>
- * </property>
- * <property>
- * <name>mapreduce.randomwriter.maxkey</name>
- * <value>10</value>
- * </property>
- * <property>
- * <name>mapreduce.randomwriter.minvalue</name>
- * <value>90</value>
- * </property>
- * <property>
- * <name>mapreduce.randomwriter.maxvalue</name>
- * <value>90</value>
- * </property>
- * <property>
- * <name>mapreduce.randomwriter.totalbytes</name>
- * <value>1099511627776</value>
- * </property>
- * </configuration></xmp>
- *
- * Equivalently, {@link RandomWriter} also supports all the above options
- * and ones supported by {@link GenericOptionsParser} via the command-line.
- */
-public class RandomWriter extends Configured implements Tool {
- public static final String TOTAL_BYTES = "mapreduce.randomwriter.totalbytes";
- public static final String BYTES_PER_MAP =
- "mapreduce.randomwriter.bytespermap";
- public static final String MAPS_PER_HOST =
- "mapreduce.randomwriter.mapsperhost";
- public static final String MAX_VALUE = "mapreduce.randomwriter.maxvalue";
- public static final String MIN_VALUE = "mapreduce.randomwriter.minvalue";
- public static final String MIN_KEY = "mapreduce.randomwriter.minkey";
- public static final String MAX_KEY = "mapreduce.randomwriter.maxkey";
-
- /**
- * User counters
- */
- static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
-
- /**
- * A custom input format that creates virtual inputs of a single string
- * for each map.
- */
- static class RandomInputFormat extends InputFormat<Text, Text> {
-
- /**
- * Generate the requested number of file splits, with the filename
- * set to the filename of the output file.
- */
- public List<InputSplit> getSplits(JobContext job) throws IOException {
- List<InputSplit> result = new ArrayList<InputSplit>();
- Path outDir = FileOutputFormat.getOutputPath(job);
- int numSplits =
- job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
- for(int i=0; i < numSplits; ++i) {
- result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,
- (String[])null));
- }
- return result;
- }
-
- /**
- * Return a single record (filename, "") where the filename is taken from
- * the file split.
- */
- static class RandomRecordReader extends RecordReader<Text, Text> {
- Path name;
- Text key = null;
- Text value = new Text();
- public RandomRecordReader(Path p) {
- name = p;
- }
-
- public void initialize(InputSplit split,
- TaskAttemptContext context)
- throws IOException, InterruptedException {
-
- }
-
- public boolean nextKeyValue() {
- if (name != null) {
- key = new Text();
- key.set(name.getName());
- name = null;
- return true;
- }
- return false;
- }
-
- public Text getCurrentKey() {
- return key;
- }
-
- public Text getCurrentValue() {
- return value;
- }
-
- public void close() {}
-
- public float getProgress() {
- return 0.0f;
- }
- }
-
- public RecordReader<Text, Text> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
- return new RandomRecordReader(((FileSplit) split).getPath());
- }
- }
-
- static class RandomMapper extends Mapper<WritableComparable, Writable,
- BytesWritable, BytesWritable> {
-
- private long numBytesToWrite;
- private int minKeySize;
- private int keySizeRange;
- private int minValueSize;
- private int valueSizeRange;
- private Random random = new Random();
- private BytesWritable randomKey = new BytesWritable();
- private BytesWritable randomValue = new BytesWritable();
-
- private void randomizeBytes(byte[] data, int offset, int length) {
- for(int i=offset + length - 1; i >= offset; --i) {
- data[i] = (byte) random.nextInt(256);
- }
- }
-
- /**
- * Given an output filename, write a bunch of random records to it.
- */
- public void map(WritableComparable key,
- Writable value,
- Context context) throws IOException,InterruptedException {
- int itemCount = 0;
- while (numBytesToWrite > 0) {
- int keyLength = minKeySize +
- (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
- randomKey.setSize(keyLength);
- randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
- int valueLength = minValueSize +
- (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
- randomValue.setSize(valueLength);
- randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
- context.write(randomKey, randomValue);
- numBytesToWrite -= keyLength + valueLength;
- context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
- context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
- if (++itemCount % 200 == 0) {
- context.setStatus("wrote record " + itemCount + ". " +
- numBytesToWrite + " bytes left.");
- }
- }
- context.setStatus("done with " + itemCount + " records.");
- }
-
- /**
- * Save the values out of the configuaration that we need to write
- * the data.
- */
- @Override
- public void setup(Context context) {
- Configuration conf = context.getConfiguration();
- numBytesToWrite = conf.getLong(BYTES_PER_MAP,
- 1*1024*1024*1024);
- minKeySize = conf.getInt(MIN_KEY, 10);
- keySizeRange =
- conf.getInt(MAX_KEY, 1000) - minKeySize;
- minValueSize = conf.getInt(MIN_VALUE, 0);
- valueSizeRange =
- conf.getInt(MAX_VALUE, 20000) - minValueSize;
- }
- }
-
- /**
- * This is the main routine for launching a distributed random write job.
- * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
- * The reduce doesn't do anything.
- *
- * @throws IOException
- */
- public int run(String[] args) throws Exception {
- if (args.length == 0) {
- System.out.println("Usage: writer <out-dir>");
- ToolRunner.printGenericCommandUsage(System.out);
- return 2;
- }
-
- Path outDir = new Path(args[0]);
- Configuration conf = getConf();
- JobClient client = new JobClient(conf);
- ClusterStatus cluster = client.getClusterStatus();
- int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
- long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
- 1*1024*1024*1024);
- if (numBytesToWritePerMap == 0) {
- System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0");
- return -2;
- }
- long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
- numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
- int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
- if (numMaps == 0 && totalBytesToWrite > 0) {
- numMaps = 1;
- conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
- }
- conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
-
- Job job = new Job(conf);
-
- job.setJarByClass(RandomWriter.class);
- job.setJobName("random-writer");
- FileOutputFormat.setOutputPath(job, outDir);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(BytesWritable.class);
- job.setInputFormatClass(RandomInputFormat.class);
- job.setMapperClass(RandomMapper.class);
- job.setReducerClass(Reducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
-
- System.out.println("Running " + numMaps + " maps.");
-
- // reducer NONE
- job.setNumReduceTasks(0);
-
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- int ret = job.waitForCompletion(true) ? 0 : 1;
- Date endTime = new Date();
- System.out.println("Job ended: " + endTime);
- System.out.println("The job took " +
- (endTime.getTime() - startTime.getTime()) /1000 +
- " seconds.");
-
- return ret;
- }
-
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);
- System.exit(res);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/SecondarySort.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/SecondarySort.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/SecondarySort.java
deleted file mode 100644
index cdae905..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/SecondarySort.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * This is an example Hadoop Map/Reduce application.
- * It reads the text input files that must contain two integers per a line.
- * The output is sorted by the first and second number and grouped on the
- * first number.
- *
- * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort
- * <i>in-dir</i> <i>out-dir</i>
- */
-public class SecondarySort extends Configured implements Tool {
-
- /**
- * Define a pair of integers that are writable.
- * They are serialized in a byte comparable format.
- */
- public static class IntPair
- implements WritableComparable<IntPair> {
- private int first = 0;
- private int second = 0;
-
- /**
- * Set the left and right values.
- */
- public void set(int left, int right) {
- first = left;
- second = right;
- }
- public int getFirst() {
- return first;
- }
- public int getSecond() {
- return second;
- }
- /**
- * Read the two integers.
- * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
- */
- @Override
- public void readFields(DataInput in) throws IOException {
- first = in.readInt() + Integer.MIN_VALUE;
- second = in.readInt() + Integer.MIN_VALUE;
- }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(first - Integer.MIN_VALUE);
- out.writeInt(second - Integer.MIN_VALUE);
- }
- @Override
- public int hashCode() {
- return first * 157 + second;
- }
- @Override
- public boolean equals(Object right) {
- if (right instanceof IntPair) {
- IntPair r = (IntPair) right;
- return r.first == first && r.second == second;
- } else {
- return false;
- }
- }
- /** A Comparator that compares serialized IntPair. */
- public static class Comparator extends WritableComparator {
- public Comparator() {
- super(IntPair.class);
- }
-
- public int compare(byte[] b1, int s1, int l1,
- byte[] b2, int s2, int l2) {
- return compareBytes(b1, s1, l1, b2, s2, l2);
- }
- }
-
- static { // register this comparator
- WritableComparator.define(IntPair.class, new Comparator());
- }
-
- @Override
- public int compareTo(IntPair o) {
- if (first != o.first) {
- return first < o.first ? -1 : 1;
- } else if (second != o.second) {
- return second < o.second ? -1 : 1;
- } else {
- return 0;
- }
- }
- }
-
- /**
- * Partition based on the first part of the pair.
- */
- public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
- @Override
- public int getPartition(IntPair key, IntWritable value,
- int numPartitions) {
- return Math.abs(key.getFirst() * 127) % numPartitions;
- }
- }
-
- /**
- * Compare only the first part of the pair, so that reduce is called once
- * for each value of the first part.
- */
- public static class FirstGroupingComparator
- implements RawComparator<IntPair> {
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,
- b2, s2, Integer.SIZE/8);
- }
-
- @Override
- public int compare(IntPair o1, IntPair o2) {
- int l = o1.getFirst();
- int r = o2.getFirst();
- return l == r ? 0 : (l < r ? -1 : 1);
- }
- }
-
- /**
- * Read two integers from each line and generate a key, value pair
- * as ((left, right), right).
- */
- public static class MapClass
- extends Mapper<LongWritable, Text, IntPair, IntWritable> {
-
- private final IntPair key = new IntPair();
- private final IntWritable value = new IntWritable();
-
- @Override
- public void map(LongWritable inKey, Text inValue,
- Context context) throws IOException, InterruptedException {
- StringTokenizer itr = new StringTokenizer(inValue.toString());
- int left = 0;
- int right = 0;
- if (itr.hasMoreTokens()) {
- left = Integer.parseInt(itr.nextToken());
- if (itr.hasMoreTokens()) {
- right = Integer.parseInt(itr.nextToken());
- }
- key.set(left, right);
- value.set(right);
- context.write(key, value);
- }
- }
- }
-
- /**
- * A reducer class that just emits the sum of the input values.
- */
- public static class Reduce
- extends Reducer<IntPair, IntWritable, Text, IntWritable> {
- private static final Text SEPARATOR =
- new Text("------------------------------------------------");
- private final Text first = new Text();
-
- @Override
- public void reduce(IntPair key, Iterable<IntWritable> values,
- Context context
- ) throws IOException, InterruptedException {
- context.write(SEPARATOR, null);
- first.set(Integer.toString(key.getFirst()));
- for(IntWritable value: values) {
- context.write(first, value);
- }
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf = getConf();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println("Usage: secondarysort <in> <out>");
- ToolRunner.printGenericCommandUsage(System.out);
- System.exit(2);
- }
- Job job = new Job(conf, "secondary sort");
- job.setJarByClass(SecondarySort.class);
- job.setMapperClass(MapClass.class);
- job.setReducerClass(Reduce.class);
-
- // group and partition by the first int in the pair
- job.setPartitionerClass(FirstPartitioner.class);
- job.setGroupingComparatorClass(FirstGroupingComparator.class);
-
- // the map output is IntPair, IntWritable
- job.setMapOutputKeyClass(IntPair.class);
- job.setMapOutputValueClass(IntWritable.class);
-
- // the reduce output is Text, IntWritable
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- return job.waitForCompletion(true) ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new SecondarySort(), args);
- System.exit(res);
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Sort.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Sort.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Sort.java
deleted file mode 100644
index 4eb5484..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Sort.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.net.URI;
-import java.util.*;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * This is the trivial map/reduce program that does absolutely nothing
- * other than use the framework to fragment and sort the input values.
- *
- * To run: bin/hadoop jar build/hadoop-examples.jar sort
- * [-r <i>reduces</i>]
- * [-inFormat <i>input format class</i>]
- * [-outFormat <i>output format class</i>]
- * [-outKey <i>output key class</i>]
- * [-outValue <i>output value class</i>]
- * [-totalOrder <i>pcnt</i> <i>num samples</i> <i>max splits</i>]
- * <i>in-dir</i> <i>out-dir</i>
- */
-public class Sort<K,V> extends Configured implements Tool {
- public static final String REDUCES_PER_HOST =
- "mapreduce.sort.reducesperhost";
- private Job job = null;
-
- static int printUsage() {
- System.out.println("sort [-r <reduces>] " +
- "[-inFormat <input format class>] " +
- "[-outFormat <output format class>] " +
- "[-outKey <output key class>] " +
- "[-outValue <output value class>] " +
- "[-totalOrder <pcnt> <num samples> <max splits>] " +
- "<input> <output>");
- ToolRunner.printGenericCommandUsage(System.out);
- return 2;
- }
-
- /**
- * The main driver for sort program.
- * Invoke this method to submit the map/reduce job.
- * @throws IOException When there is communication problems with the
- * job tracker.
- */
- public int run(String[] args) throws Exception {
-
- Configuration conf = getConf();
- JobClient client = new JobClient(conf);
- ClusterStatus cluster = client.getClusterStatus();
- int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
- String sort_reduces = conf.get(REDUCES_PER_HOST);
- if (sort_reduces != null) {
- num_reduces = cluster.getTaskTrackers() *
- Integer.parseInt(sort_reduces);
- }
- Class<? extends InputFormat> inputFormatClass =
- SequenceFileInputFormat.class;
- Class<? extends OutputFormat> outputFormatClass =
- SequenceFileOutputFormat.class;
- Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
- Class<? extends Writable> outputValueClass = BytesWritable.class;
- List<String> otherArgs = new ArrayList<String>();
- InputSampler.Sampler<K,V> sampler = null;
- for(int i=0; i < args.length; ++i) {
- try {
- if ("-r".equals(args[i])) {
- num_reduces = Integer.parseInt(args[++i]);
- } else if ("-inFormat".equals(args[i])) {
- inputFormatClass =
- Class.forName(args[++i]).asSubclass(InputFormat.class);
- } else if ("-outFormat".equals(args[i])) {
- outputFormatClass =
- Class.forName(args[++i]).asSubclass(OutputFormat.class);
- } else if ("-outKey".equals(args[i])) {
- outputKeyClass =
- Class.forName(args[++i]).asSubclass(WritableComparable.class);
- } else if ("-outValue".equals(args[i])) {
- outputValueClass =
- Class.forName(args[++i]).asSubclass(Writable.class);
- } else if ("-totalOrder".equals(args[i])) {
- double pcnt = Double.parseDouble(args[++i]);
- int numSamples = Integer.parseInt(args[++i]);
- int maxSplits = Integer.parseInt(args[++i]);
- if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
- sampler =
- new InputSampler.RandomSampler<K,V>(pcnt, numSamples, maxSplits);
- } else {
- otherArgs.add(args[i]);
- }
- } catch (NumberFormatException except) {
- System.out.println("ERROR: Integer expected instead of " + args[i]);
- return printUsage();
- } catch (ArrayIndexOutOfBoundsException except) {
- System.out.println("ERROR: Required parameter missing from " +
- args[i-1]);
- return printUsage(); // exits
- }
- }
- // Set user-supplied (possibly default) job configs
- job = new Job(conf);
- job.setJobName("sorter");
- job.setJarByClass(Sort.class);
-
- job.setMapperClass(Mapper.class);
- job.setReducerClass(Reducer.class);
-
- job.setNumReduceTasks(num_reduces);
-
- job.setInputFormatClass(inputFormatClass);
- job.setOutputFormatClass(outputFormatClass);
-
- job.setOutputKeyClass(outputKeyClass);
- job.setOutputValueClass(outputValueClass);
-
- // Make sure there are exactly 2 parameters left.
- if (otherArgs.size() != 2) {
- System.out.println("ERROR: Wrong number of parameters: " +
- otherArgs.size() + " instead of 2.");
- return printUsage();
- }
- FileInputFormat.setInputPaths(job, otherArgs.get(0));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
-
- if (sampler != null) {
- System.out.println("Sampling input to effect total-order sort...");
- job.setPartitionerClass(TotalOrderPartitioner.class);
- Path inputDir = FileInputFormat.getInputPaths(job)[0];
- inputDir = inputDir.makeQualified(inputDir.getFileSystem(conf));
- Path partitionFile = new Path(inputDir, "_sortPartitioning");
- TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
- InputSampler.<K,V>writePartitionFile(job, sampler);
- URI partitionUri = new URI(partitionFile.toString() +
- "#" + "_sortPartitioning");
- DistributedCache.addCacheFile(partitionUri, conf);
- }
-
- System.out.println("Running on " +
- cluster.getTaskTrackers() +
- " nodes to sort from " +
- FileInputFormat.getInputPaths(job)[0] + " into " +
- FileOutputFormat.getOutputPath(job) +
- " with " + num_reduces + " reduces.");
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- int ret = job.waitForCompletion(true) ? 0 : 1;
- Date end_time = new Date();
- System.out.println("Job ended: " + end_time);
- System.out.println("The job took " +
- (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
- return ret;
- }
-
-
-
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new Sort(), args);
- System.exit(res);
- }
-
- /**
- * Get the last job that was run using this instance.
- * @return the results of the last job that was run
- */
- public Job getResult() {
- return job;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
deleted file mode 100644
index 72c14fc..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.TreeMap;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.PreWarmVertex;
-import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.output.MROutputLegacy;
-import org.apache.tez.mapreduce.processor.map.MapProcessor;
-import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * An MRR job built on top of word count to return words sorted by
- * their frequency of occurrence.
- *
- * Use -DUSE_TEZ_SESSION=true to run jobs in a session mode.
- * If multiple input/outputs are provided, this job will process each pair
- * as a separate DAG in a sequential manner.
- * Use -DINTER_JOB_SLEEP_INTERVAL=<N> where N is the sleep interval in seconds
- * between the sequential DAGs.
- */
-public class TestOrderedWordCount extends Configured implements Tool {
-
- private static Log LOG = LogFactory.getLog(TestOrderedWordCount.class);
-
- public static class TokenizerMapper
- extends Mapper<Object, Text, Text, IntWritable>{
-
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
-
- public void map(Object key, Text value, Context context
- ) throws IOException, InterruptedException {
- StringTokenizer itr = new StringTokenizer(value.toString());
- while (itr.hasMoreTokens()) {
- word.set(itr.nextToken());
- context.write(word, one);
- }
- }
- }
-
- public static class IntSumReducer
- extends Reducer<Text,IntWritable,IntWritable, Text> {
- private IntWritable result = new IntWritable();
-
- public void reduce(Text key, Iterable<IntWritable> values,
- Context context
- ) throws IOException, InterruptedException {
- int sum = 0;
- for (IntWritable val : values) {
- sum += val.get();
- }
- result.set(sum);
- context.write(result, key);
- }
- }
-
- /**
- * Shuffle ensures ordering based on count of employees per department
- * hence the final reducer is a no-op and just emits the department name
- * with the employee count per department.
- */
- public static class MyOrderByNoOpReducer
- extends Reducer<IntWritable, Text, Text, IntWritable> {
-
- public void reduce(IntWritable key, Iterable<Text> values,
- Context context
- ) throws IOException, InterruptedException {
- for (Text word : values) {
- context.write(word, key);
- }
- }
- }
-
- private Credentials credentials = new Credentials();
-
- @VisibleForTesting
- public DAG createDAG(FileSystem fs, Configuration conf,
- Map<String, LocalResource> commonLocalResources, Path stagingDir,
- int dagIndex, String inputPath, String outputPath,
- boolean generateSplitsInClient) throws Exception {
-
- Configuration mapStageConf = new JobConf(conf);
- mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
- TokenizerMapper.class.getName());
-
- MRHelpers.translateMRConfToTez(mapStageConf);
-
- Configuration iReduceStageConf = new JobConf(conf);
- // TODO replace with auto-reduce parallelism
- iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2);
- iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
- IntSumReducer.class.getName());
- iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
- iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
- IntWritable.class.getName());
- iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
- MRHelpers.translateMRConfToTez(iReduceStageConf);
-
- Configuration finalReduceConf = new JobConf(conf);
- finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
- finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
- MyOrderByNoOpReducer.class.getName());
- finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
- finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
- MRHelpers.translateMRConfToTez(finalReduceConf);
-
- MRHelpers.configureMRApiUsage(mapStageConf);
- MRHelpers.configureMRApiUsage(iReduceStageConf);
- MRHelpers.configureMRApiUsage(finalReduceConf);
-
- List<Vertex> vertices = new ArrayList<Vertex>();
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
- mapStageConf.writeXml(outputStream);
- String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
- DataSourceDescriptor dsd;
- if (generateSplitsInClient) {
- mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
- TextInputFormat.class.getName());
- mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
- mapStageConf.setBoolean("mapred.mapper.new-api", true);
- dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(mapStageConf, stagingDir, true);
- } else {
- dsd = MRInputLegacy.createConfigurer(mapStageConf, TextInputFormat.class, inputPath).create();
- }
-
- Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
- MapProcessor.class.getName()).setUserPayload(
- TezUtils.createUserPayloadFromConf(mapStageConf))
- .setHistoryText(mapStageHistoryText)).setTaskLocalFiles(commonLocalResources);
- mapVertex.addDataSource("MRInput", dsd);
- vertices.add(mapVertex);
-
- ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
- iReduceStageConf.writeXml(iROutputStream);
- String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
- Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
- ReduceProcessor.class.getName())
- .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
- .setHistoryText(iReduceStageHistoryText), 2);
- ivertex.setTaskLocalFiles(commonLocalResources);
- vertices.add(ivertex);
-
- ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
- finalReduceConf.writeXml(finalReduceOutputStream);
- String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
- UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
- Vertex finalReduceVertex = new Vertex("finalreduce",
- new ProcessorDescriptor(
- ReduceProcessor.class.getName())
- .setUserPayload(finalReducePayload)
- .setHistoryText(finalReduceStageHistoryText), 1);
- finalReduceVertex.setTaskLocalFiles(commonLocalResources);
- finalReduceVertex.addDataSink("MROutput",
- MROutputLegacy.createConfigurer(finalReduceConf, TextOutputFormat.class, outputPath)
- .create());
- vertices.add(finalReduceVertex);
-
- DAG dag = new DAG("OrderedWordCount" + dagIndex);
- for (int i = 0; i < vertices.size(); ++i) {
- dag.addVertex(vertices.get(i));
- }
-
- OrderedPartitionedKVEdgeConfigurer edgeConf1 = OrderedPartitionedKVEdgeConfigurer
- .newBuilder(Text.class.getName(), IntWritable.class.getName(),
- HashPartitioner.class.getName()).setFromConfiguration(conf)
- .configureInput().useLegacyInput().done().build();
- dag.addEdge(
- new Edge(dag.getVertex("initialmap"), dag.getVertex("intermediate_reducer"),
- edgeConf1.createDefaultEdgeProperty()));
-
- OrderedPartitionedKVEdgeConfigurer edgeConf2 = OrderedPartitionedKVEdgeConfigurer
- .newBuilder(IntWritable.class.getName(), Text.class.getName(),
- HashPartitioner.class.getName()).setFromConfiguration(conf)
- .configureInput().useLegacyInput().done().build();
- dag.addEdge(
- new Edge(dag.getVertex("intermediate_reducer"), dag.getVertex("finalreduce"),
- edgeConf2.createDefaultEdgeProperty()));
-
- return dag;
- }
-
- private static void printUsage() {
- String options = " [-generateSplitsInClient true/<false>]";
- System.err.println("Usage: testorderedwordcount <in> <out>" + options);
- System.err.println("Usage (In Session Mode):"
- + " testorderedwordcount <in1> <out1> ... <inN> <outN>" + options);
- ToolRunner.printGenericCommandUsage(System.err);
- }
-
-
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf = getConf();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-
- boolean generateSplitsInClient;
-
- SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
- try {
- generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
- otherArgs = splitCmdLineParser.getRemainingArgs();
- } catch (ParseException e1) {
- System.err.println("Invalid options");
- printUsage();
- return 2;
- }
-
- boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
- long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0)
- * 1000;
-
- boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false);
-
- if (((otherArgs.length%2) != 0)
- || (!useTezSession && otherArgs.length != 2)) {
- printUsage();
- return 2;
- }
-
- List<String> inputPaths = new ArrayList<String>();
- List<String> outputPaths = new ArrayList<String>();
-
- for (int i = 0; i < otherArgs.length; i+=2) {
- inputPaths.add(otherArgs[i]);
- outputPaths.add(otherArgs[i+1]);
- }
-
- UserGroupInformation.setConfiguration(conf);
-
- TezConfiguration tezConf = new TezConfiguration(conf);
- TestOrderedWordCount instance = new TestOrderedWordCount();
-
- FileSystem fs = FileSystem.get(conf);
-
- String stagingDirStr = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
- TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + Path.SEPARATOR +
- Long.toString(System.currentTimeMillis());
- Path stagingDir = new Path(stagingDirStr);
- FileSystem pathFs = stagingDir.getFileSystem(tezConf);
- pathFs.mkdirs(new Path(stagingDirStr));
-
- tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
- stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
-
- TokenCache.obtainTokensForNamenodes(instance.credentials, new Path[] {stagingDir}, conf);
- TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
-
- // No need to add jar containing this class as assumed to be part of
- // the tez jars.
-
- // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
- // is the same filesystem as the one used for Input/Output.
-
- if (useTezSession) {
- LOG.info("Creating Tez Session");
- tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
- } else {
- tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
- }
- TezClient tezSession = new TezClient("OrderedWordCountSession", tezConf,
- null, instance.credentials);
- tezSession.start();
-
- DAGStatus dagStatus = null;
- DAGClient dagClient = null;
- String[] vNames = { "initialmap", "intermediate_reducer",
- "finalreduce" };
-
- Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
- try {
- for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) {
- if (dagIndex != 1
- && interJobSleepTimeout > 0) {
- try {
- LOG.info("Sleeping between jobs, sleepInterval="
- + (interJobSleepTimeout/1000));
- Thread.sleep(interJobSleepTimeout);
- } catch (InterruptedException e) {
- LOG.info("Main thread interrupted. Breaking out of job loop");
- break;
- }
- }
-
- String inputPath = inputPaths.get(dagIndex-1);
- String outputPath = outputPaths.get(dagIndex-1);
-
- if (fs.exists(new Path(outputPath))) {
- throw new FileAlreadyExistsException("Output directory "
- + outputPath + " already exists");
- }
- LOG.info("Running OrderedWordCount DAG"
- + ", dagIndex=" + dagIndex
- + ", inputPath=" + inputPath
- + ", outputPath=" + outputPath);
-
- Map<String, LocalResource> localResources =
- new TreeMap<String, LocalResource>();
-
- DAG dag = instance.createDAG(fs, conf, localResources,
- stagingDir, dagIndex, inputPath, outputPath,
- generateSplitsInClient);
-
- boolean doPreWarm = dagIndex == 1 && useTezSession
- && conf.getBoolean("PRE_WARM_SESSION", true);
- int preWarmNumContainers = 0;
- if (doPreWarm) {
- preWarmNumContainers = conf.getInt("PRE_WARM_NUM_CONTAINERS", 0);
- if (preWarmNumContainers <= 0) {
- doPreWarm = false;
- }
- }
- if (doPreWarm) {
- LOG.info("Pre-warming Session");
- PreWarmVertex preWarmVertex = new PreWarmVertex("PreWarm", preWarmNumContainers, dag
- .getVertex("initialmap").getTaskResource());
- preWarmVertex.setTaskLocalFiles(dag.getVertex("initialmap").getTaskLocalFiles());
- preWarmVertex.setTaskEnvironment(dag.getVertex("initialmap").getTaskEnvironment());
- preWarmVertex.setTaskLaunchCmdOpts(dag.getVertex("initialmap").getTaskLaunchCmdOpts());
-
- tezSession.preWarm(preWarmVertex);
- }
-
- if (useTezSession) {
- LOG.info("Waiting for TezSession to get into ready state");
- waitForTezSessionReady(tezSession);
- LOG.info("Submitting DAG to Tez Session, dagIndex=" + dagIndex);
- dagClient = tezSession.submitDAG(dag);
- LOG.info("Submitted DAG to Tez Session, dagIndex=" + dagIndex);
- } else {
- LOG.info("Submitting DAG as a new Tez Application");
- dagClient = tezSession.submitDAG(dag);
- }
-
- while (true) {
- dagStatus = dagClient.getDAGStatus(statusGetOpts);
- if (dagStatus.getState() == DAGStatus.State.RUNNING ||
- dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
- dagStatus.getState() == DAGStatus.State.FAILED ||
- dagStatus.getState() == DAGStatus.State.KILLED ||
- dagStatus.getState() == DAGStatus.State.ERROR) {
- break;
- }
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- // continue;
- }
- }
-
-
- while (dagStatus.getState() != DAGStatus.State.SUCCEEDED &&
- dagStatus.getState() != DAGStatus.State.FAILED &&
- dagStatus.getState() != DAGStatus.State.KILLED &&
- dagStatus.getState() != DAGStatus.State.ERROR) {
- if (dagStatus.getState() == DAGStatus.State.RUNNING) {
- ExampleDriver.printDAGStatus(dagClient, vNames);
- }
- try {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // continue;
- }
- dagStatus = dagClient.getDAGStatus(statusGetOpts);
- } catch (TezException e) {
- LOG.fatal("Failed to get application progress. Exiting");
- return -1;
- }
- }
- ExampleDriver.printDAGStatus(dagClient, vNames,
- true, true);
- LOG.info("DAG " + dagIndex + " completed. "
- + "FinalState=" + dagStatus.getState());
- if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
- LOG.info("DAG " + dagIndex + " diagnostics: "
- + dagStatus.getDiagnostics());
- }
- }
- } catch (Exception e) {
- LOG.error("Error occurred when submitting/running DAGs", e);
- throw e;
- } finally {
- if (!retainStagingDir) {
- pathFs.delete(stagingDir, true);
- }
- LOG.info("Shutting down session");
- tezSession.stop();
- }
-
- if (!useTezSession) {
- ExampleDriver.printDAGStatus(dagClient, vNames);
- LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
- }
- return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
- }
-
- private static void waitForTezSessionReady(TezClient tezSession)
- throws IOException, TezException, InterruptedException {
- tezSession.waitTillReady();
- }
-
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new TestOrderedWordCount(), args);
- System.exit(res);
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
deleted file mode 100644
index a9b23e0..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.mapreduce.examples;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DataSinkDescriptor;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.GroupInputEdge;
-import org.apache.tez.dag.api.VertexGroup;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.input.MRInput;
-import org.apache.tez.mapreduce.input.MRInput.MRInputConfigurer;
-import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.Output;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
-import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-public class UnionExample {
-
- public static class TokenProcessor extends SimpleMRProcessor {
- IntWritable one = new IntWritable(1);
- Text word = new Text();
-
- public TokenProcessor(ProcessorContext context) {
- super(context);
- }
-
- @Override
- public void run() throws Exception {
- Preconditions.checkArgument(getInputs().size() == 1);
- boolean inUnion = true;
- if (getContext().getTaskVertexName().equals("map3")) {
- inUnion = false;
- }
- Preconditions.checkArgument(getOutputs().size() == (inUnion ? 2 : 1));
- Preconditions.checkArgument(getOutputs().containsKey("checker"));
- MRInput input = (MRInput) getInputs().values().iterator().next();
- KeyValueReader kvReader = input.getReader();
- Output output = getOutputs().get("checker");
- KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
- MROutput parts = null;
- KeyValueWriter partsWriter = null;
- if (inUnion) {
- parts = (MROutput) getOutputs().get("parts");
- partsWriter = parts.getWriter();
- }
- while (kvReader.next()) {
- StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
- while (itr.hasMoreTokens()) {
- word.set(itr.nextToken());
- kvWriter.write(word, one);
- if (inUnion) {
- partsWriter.write(word, one);
- }
- }
- }
- }
-
- }
-
- public static class UnionProcessor extends SimpleMRProcessor {
- IntWritable one = new IntWritable(1);
-
- public UnionProcessor(ProcessorContext context) {
- super(context);
- }
-
- @Override
- public void run() throws Exception {
- Preconditions.checkArgument(getInputs().size() == 2);
- Preconditions.checkArgument(getOutputs().size() == 2);
- MROutput out = (MROutput) getOutputs().get("union");
- MROutput allParts = (MROutput) getOutputs().get("all-parts");
- KeyValueWriter kvWriter = out.getWriter();
- KeyValueWriter partsWriter = allParts.getWriter();
- Map<String, AtomicInteger> unionKv = Maps.newHashMap();
- LogicalInput union = getInputs().get("union");
- KeyValuesReader kvReader = (KeyValuesReader) union.getReader();
- while (kvReader.next()) {
- String word = ((Text) kvReader.getCurrentKey()).toString();
- IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
- for (int i = 0; i < intVal.get(); ++i) {
- partsWriter.write(word, one);
- }
- AtomicInteger value = unionKv.get(word);
- if (value == null) {
- unionKv.put(word, new AtomicInteger(intVal.get()));
- } else {
- value.addAndGet(intVal.get());
- }
- }
- LogicalInput map3 = getInputs().get("map3");
- kvReader = (KeyValuesReader) map3.getReader();
- while (kvReader.next()) {
- String word = ((Text) kvReader.getCurrentKey()).toString();
- IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
- AtomicInteger value = unionKv.get(word);
- if (value == null) {
- throw new TezUncheckedException("Expected to exist: " + word);
- } else {
- value.getAndAdd(intVal.get() * -2);
- }
- }
- for (AtomicInteger value : unionKv.values()) {
- if (value.get() != 0) {
- throw new TezUncheckedException("Unexpected non-zero value");
- }
- }
- kvWriter.write("Union", new IntWritable(unionKv.size()));
- }
-
- }
-
- private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
- Map<String, LocalResource> localResources, Path stagingDir,
- String inputPath, String outputPath) throws IOException {
- DAG dag = new DAG("UnionExample");
-
- int numMaps = -1;
- Configuration inputConf = new Configuration(tezConf);
- MRInputConfigurer configurer = MRInput.createConfigurer(inputConf, TextInputFormat.class,
- inputPath);
- DataSourceDescriptor dataSource = configurer.generateSplitsInAM(false).create();
-
- Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
- TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
-
- Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
- TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
-
- Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
- TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
-
- Vertex checkerVertex = new Vertex("checker", new ProcessorDescriptor(
- UnionProcessor.class.getName()), 1);
-
- Configuration outputConf = new Configuration(tezConf);
- DataSinkDescriptor od = MROutput.createConfigurer(outputConf,
- TextOutputFormat.class, outputPath).create();
- checkerVertex.addDataSink("union", od);
-
-
- Configuration allPartsConf = new Configuration(tezConf);
- DataSinkDescriptor od2 = MROutput.createConfigurer(allPartsConf,
- TextOutputFormat.class, outputPath + "-all-parts").create();
- checkerVertex.addDataSink("all-parts", od2);
-
- Configuration partsConf = new Configuration(tezConf);
- DataSinkDescriptor od1 = MROutput.createConfigurer(partsConf,
- TextOutputFormat.class, outputPath + "-parts").create();
- VertexGroup unionVertex = dag.createVertexGroup("union", mapVertex1, mapVertex2);
- unionVertex.addDataSink("parts", od1);
-
- OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
- .newBuilder(Text.class.getName(), IntWritable.class.getName(),
- HashPartitioner.class.getName()).build();
-
- dag.addVertex(mapVertex1)
- .addVertex(mapVertex2)
- .addVertex(mapVertex3)
- .addVertex(checkerVertex)
- .addEdge(
- new Edge(mapVertex3, checkerVertex, edgeConf.createDefaultEdgeProperty()))
- .addEdge(
- new GroupInputEdge(unionVertex, checkerVertex, edgeConf.createDefaultEdgeProperty(),
- new InputDescriptor(
- ConcatenatedMergedKeyValuesInput.class.getName())));
- return dag;
- }
-
- private static void printUsage() {
- System.err.println("Usage: " + " unionexample <in1> <out1>");
- }
-
- public boolean run(String inputPath, String outputPath, Configuration conf) throws Exception {
- System.out.println("Running UnionExample");
- // conf and UGI
- TezConfiguration tezConf;
- if (conf != null) {
- tezConf = new TezConfiguration(conf);
- } else {
- tezConf = new TezConfiguration();
- }
- UserGroupInformation.setConfiguration(tezConf);
- String user = UserGroupInformation.getCurrentUser().getShortUserName();
-
- // staging dir
- FileSystem fs = FileSystem.get(tezConf);
- String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
- + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
- + Path.SEPARATOR + Long.toString(System.currentTimeMillis());
- Path stagingDir = new Path(stagingDirStr);
- tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
- stagingDir = fs.makeQualified(stagingDir);
-
-
- // No need to add jar containing this class as assumed to be part of
- // the tez jars.
-
- // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
- // is the same filesystem as the one used for Input/Output.
-
- TezClient tezSession = new TezClient("UnionExampleSession", tezConf);
- tezSession.start();
-
- DAGClient dagClient = null;
-
- try {
- if (fs.exists(new Path(outputPath))) {
- throw new FileAlreadyExistsException("Output directory "
- + outputPath + " already exists");
- }
-
- Map<String, LocalResource> localResources =
- new TreeMap<String, LocalResource>();
-
- DAG dag = createDAG(fs, tezConf, localResources,
- stagingDir, inputPath, outputPath);
-
- tezSession.waitTillReady();
- dagClient = tezSession.submitDAG(dag);
-
- // monitoring
- DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(EnumSet.of(StatusGetOpts.GET_COUNTERS));
- if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
- System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
- return false;
- }
- return true;
- } finally {
- fs.delete(stagingDir, true);
- tezSession.stop();
- }
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length != 2) {
- printUsage();
- System.exit(2);
- }
- UnionExample job = new UnionExample();
- job.run(args[0], args[1], null);
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
deleted file mode 100644
index 93ec860..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples.helpers;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-import com.google.common.base.Preconditions;
-
-public class SplitsInClientOptionParser {
-
- private CommandLine cmdLine;
- private String[] otherArgs;
-
- private boolean parsed = false;
-
- public SplitsInClientOptionParser() {
-
- }
-
- public String[] getRemainingArgs() {
- Preconditions.checkState(parsed,
- "Cannot get remaining args without parsing");
- return otherArgs;
- }
-
- @SuppressWarnings("static-access")
- public boolean parse(String[] args, boolean defaultVal) throws ParseException {
- Preconditions.checkState(parsed == false,
- "Craete a new instance for different option sets");
- parsed = true;
- Options opts = new Options();
- Option opt = OptionBuilder
- .withArgName("splits_in_client")
- .hasArg()
- .withDescription(
- "specify whether splits should be generated in the client")
- .create("generateSplitsInClient");
- opts.addOption(opt);
- CommandLineParser parser = new GnuParser();
-
- cmdLine = parser.parse(opts, args, false);
- if (cmdLine.hasOption("generateSplitsInClient")) {
- defaultVal = Boolean.parseBoolean(cmdLine
- .getOptionValue("generateSplitsInClient"));
- }
- otherArgs = cmdLine.getArgs();
- return defaultVal;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/package.html
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/package.html b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/package.html
deleted file mode 100644
index 0906086..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/package.html
+++ /dev/null
@@ -1,23 +0,0 @@
-<html>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<body>
-Hadoop example code.
-</body>
-</html>
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/.gitignore
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/.gitignore b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/.gitignore
deleted file mode 100644
index 8239358..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/.gitignore
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-Yahoo2009.aux
-Yahoo2009.bbl
-Yahoo2009.blg
-Yahoo2009.log
-Yahoo2009.out
-Yahoo2009.pdf
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex
deleted file mode 100644
index 1deb812..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex
+++ /dev/null
@@ -1,370 +0,0 @@
-% Licensed to the Apache Software Foundation (ASF) under one
-% or more contributor license agreements. See the NOTICE file
-% distributed with this work for additional information
-% regarding copyright ownership. The ASF licenses this file
-% to you under the Apache License, Version 2.0 (the
-% "License"); you may not use this file except in compliance
-% with the License. You may obtain a copy of the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS,
-% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-% See the License for the specific language governing permissions and
-% limitations under the License.
-\documentclass{article}
-\usepackage[pdftex]{hyperref}
-\usepackage[pdftex]{graphicx}
-
-\title{Winning a 60 Second Dash with a Yellow Elephant}
-\author{\href{http://people.apache.org/~omalley}{Owen O'Malley} and
- \href{http://people.apache.org/~acmurthy}{Arun C. Murthy}\\
-\href{http://www.yahoo.com/}{Yahoo!}\\
-owen@yahoo-inc.com and acm@yahoo-inc.com}
-\date{April 2009}
-\begin{document}
-\maketitle
-\href{http://hadoop.apache.org/core}{Apache Hadoop} is a open source
-software framework that dramatically simplifies writing distributed
-data intensive applications. It provides a distributed file system,
-which is modeled after the Google File System\cite{gfs}, and a
-map/reduce\cite{mapreduce} implementation that manages distributed
-computation. Jim Gray defined a benchmark to compare large sorting
-programs. Since the core of map/reduce is a distributed sort, most of
-the custom code is glue to get the desired behavior.
-
-\section{Benchmark Rules}
-
-Jim's Gray's sort benchmark consists of a set of many related
-benchmarks, each with their own rules. All of the sort benchmarks
-measure the time to sort different numbers of 100 byte records. The
-first 10 bytes of each record is the key and the rest is the
-value. The \textbf{minute sort} must finish end to end in less than a
-minute. The \textbf{Gray sort} must sort more than 100 terabytes and
-must run for at least an hour.
-
-\begin{itemize}
-\item The input data must precisely match the data generated by the C
- data generator.
-\item The input must not be in the operating system's file
- cache when the job starts.. Under Linux, this requires using the memory for something
- else between sorting runs.
-\item The input and output data must not be compressed.
-\item The output must not overwrite the input.
-\item The output must be synced to disk.
-\item The 128 bit sum of the crc32's of each key/value pair must be
- calculated for the input and output. Naturally, they must be
- identical.
-\item The output may be divided into multiple output files, but it
- must be totally ordered (simply concatenating the output files must
- produce the completely sorted output).
-\item Starting and distributing the application to the cluster must be
- included in the execution time.
-\item Any sampling must be included in the execution time.
-\end{itemize}
-
-\section{Hadoop implementation}
-
-We extended the programs from last year to create and manipulate the
-new binary format and match the new rules. There are now 4 Hadoop
-map/reduce applications to support the benchmark:
-\begin{enumerate}
-\item \textbf{TeraGen} is a map/reduce program to generate the data.
-\item \textbf{TeraSort} samples the input data and uses map/reduce to
- sort the data into a total order.
-\item \textbf{TeraSum} is a map/reduce program computes the 128 bit
- sum of the crc32 of each key/value pair.
-\item \textbf{TeraValidate} is a map/reduce program that validates the
- output is sorted and computes the sum of the checksums as TeraSum.
-\end{enumerate}
-The update to the terasort programs will be checked in as
-\href{http://issues.apache.org/jira/browse/HADOOP-5716}{HADOOP-5716}.
-
-\textbf{TeraGen} generates input data for the sort that is byte for byte
-equivalent to the C version that was released in March of 2009,
-including specific keys and values. It divides the desired number of
-rows by the desired number of tasks and assigns ranges of rows to each
-map. The map jumps the random number generator to the correct value
-for the first row and generates the following rows.
-
-\textbf{TeraSort} is a standard map/reduce sort, except for a custom
-partitioner that ensures that all of the keys in reduce $N$ are after
-all of the keys in reduce $N-1$. This is a requirement of the contest
-so that the output of the sort is totally ordered, even if it is
-divided up by reduce.
-
-We wrote an input and output format, used by all 4 applications to
-read and write the files in the new format.
-
-\textbf{TeraSum} computes the 128 bit sum of the CRC32 of each
-key/value pair. Each map computes the sum of its input and emits a
-single 128 bit sum. There is a single reduce that adds the sums from
-each map. We used this program on the input directory to calculate the
-sum of the checksums of each key/value pair to check the correctness
-of the output of the sort. We also used TeraSum on a distinct dataset
-that was larger than the total RAM in the cluster to flush the Linux
-file cache between runs of the small (500 GB and 1TB) sorts.
-
-\textbf{TeraValidate} ensures that the output is globally sorted. It
-creates one map per file in the output directory and each map
-ensures that each key is less than or equal to the previous one. The
-map also generates records with the first and last keys of the file
-and the reduce ensures that the first key of file $i$ is greater that
-the last key of file $i-1$. Any problems are reported as output of the
-reduce with the keys that are out of order. Additionally, TeraValidate
-calculates the sum of checksums of the output directory.
-
-\section{Hardware and Operating System}
-
-We ran our benchmarks on Yahoo's Hammer cluster. Hammer's hardware is
-very similar to the hardware that we used in last year's terabyte
-sort. The hardware and operating system details are:
-
-\begin{itemize}
-\item approximately 3800 nodes (in such a large cluster, nodes are
- always down)
-\item 2 quad core Xeons @ 2.5ghz per node
-\item 4 SATA disks per node
-\item 8G RAM per node (upgraded to 16GB before the petabyte sort)
-\item 1 gigabit ethernet on each node
-\item 40 nodes per rack
-\item 8 gigabit ethernet uplinks from each rack to the core
-\item Red Hat Enterprise Linux Server Release 5.1 (kernel 2.6.18)
-\item Sun Java JDK (1.6.0\_05-b13 and 1.6.0\_13-b03) (32 and 64 bit)
-\end{itemize}
-
-We hit a JVM bug in 1.6.0\_05-b13 on the larger sorts (100TB and 1PB)
-and switched over to the later JVM, which resolved the issue. For the
-larger sorts, we used 64 bit JVMs for the Name Node and Job Tracker.
-
-\section{Software and Configuration}
-
-The version of Hadoop we used was a private branch of trunk that was
-started in January 2009, which is after the 0.20 branch was feature
-frozen. We used git to manage our branch and it allowed us to easily
-coordinate our work, track our changes, and resynchronize with the
-current Hadoop trunk.
-
-The changes include:
-
-\begin{enumerate}
-
-\item Updated the terasort example in the Hadoop code base to match
- the dataset defined by the rule changes in the benchmark from March
- of 2009.
- (\href{http://issues.apache.org/jira/browse/HADOOP-5716}{HADOOP-5716})
-
-\item We reimplemented the reducer side of Hadoop's shuffle. The
- redesign improved the performance of the shuffle and removed
- bottlenecks and over-throttling. It also made the code more
- maintainable and understandable by breaking a 3000 line Java file
- into multiple classes with a clean set of interfaces.
- (\href{http://issues.apache.org/jira/browse/HADOOP-5223}{HADOOP-5223})
-
-\item The new shuffle also fetches multiple map outputs from the same
- node over each connection rather than one at a time. Fetching
- multiple map outputs at the same time avoids connection setup costs
- and also avoids the round trip while the server responds to the request.
- (\href{http://issues.apache.org/jira/browse/HADOOP-1338}{HADOOP-1338})
-
-\item Allowed configuring timeouts on the shuffle connections and we
- shortened them for the small sorts. We observed cases where the
- connections for the shuffle would hang until the timeout, which made
- low latency jobs impossibly long.
- (\href{http://issues.apache.org/jira/browse/HADOOP-5789}{HADOOP-5789})
-
-\item Set TCP no-delay and more frequent pings between the Task and
- the Task Tracker to reduce latency in detecting problems.
- (\href{http://issues.apache.org/jira/browse/HADOOP-5788}{HADOOP-5788})
-
-\item We added some protection code to detect incorrect data being
- transmitted in the shuffle from causing the reduce to fail. It
- appears this is either a JVM NIO bug or Jetty bug that likely
- affects 0.20 and trunk under heavy load.
- (\href{http://issues.apache.org/jira/browse/HADOOP-5783}{HADOOP-5783})
-
-\item We used LZO compression on the map outputs. On the new dataset, LZO
- compresses down to 45\% of the original size. By comparison, the
- dataset from last year compresses to 20\% of the original size. Last
- year, the shuffle would run out of direct buffers if we used
- compression on the map outputs.
-
-\item We implemented memory to memory merges in the reduce during the
- shuffle to combine the map outputs in memory before we finish the
- shuffle, thereby reducing the work needed when the reduce is
- running.
-
-\item We multi-threaded the sampling code that read the input set to
- find the partition points between the reduces. We also wrote a
- simple partitioner that assumes the keys are evenly
- distributed. Since the new dataset does not require sampling, the
- simple partitioner produces very even partitions.
- (\href{http://issues.apache.org/jira/browse/HADOOP-4946}{HADOOP-4946})
-
-\item On the smaller clusters, we configured the system with faster
- heartbeat cycles from the Task Trackers to the Job Tracker (it
- defaults to 10 secs / 1000 nodes, but we made it configurable and
- brought it down to 2 seconds/1000 nodes to provide lower latency)
- (\href{http://issues.apache.org/jira/browse/HADOOP-5784}{HADOOP-5784})
-
-\item Typically the Job Tracker assigns tasks to Task Trackers on a
- first come first served basis. This greedy assignment of tasks did
- not lead to good data locality. However, by taking a global view and
- placing all of the map tasks at once, the system achieves much better
- locality. Rather than implement global scheduling for all of Hadoop,
- which would be much harder, we implemented a global scheduler for
- the terasort example in the input format. Basically, the input
- format computes the splits and assigns work to the nodes that have
- the fewest blocks first. For a node that has more blocks
- than map slots, it picks the block that have the fewest remaining
- locations left. This greedy global algorithm seems to get very good
- locality. The input format would schedule the maps and then change
- the input split descriptions to only have a single location instead
- of the original 3. This increased task locality by 40\% or so over
- the greedy scheduler.
-
-\item Hadoop 0.20 added setup and cleanup tasks. Since they are not
- required for the sort benchmarks, we allow them to be disabled to
- reduce the latency of starting and stopping the job.
- (\href{http://issues.apache.org/jira/browse/HADOOP-5785}{HADOOP-5785})
-
-\item We discovered a performance problem where in some contexts the
- cost of using the JNI-based CRC32 was very high. By implementing it
- in pure Java, the average case is a little slower, but the worst
- case is much better.
- (\href{http://issues.apache.org/jira/browse/HADOOP-5598}{HADOOP-5598})
-
-\item We found and removed some hard-coded wait loops from the
- framework that don't matter for large jobs, but can seriously slow
- down low latency jobs.
-
-\item Allowed setting the logging level for the tasks, so that we
- could cut down on logging. When running for "real" we configure the
- logging level to WARN instead of the default INFO. Reducing the
- amount of logging has a huge impact on the performance of the
- system, but obviously makes debugging and analysis much harder.
- (\href{http://issues.apache.org/jira/browse/HADOOP-5786}{HADOOP-5786})
-
-\item One optimization that we didn't finish is to optimize the job
- planning code. Currently, it uses an RPC to the Name Node for each
- input file, which we have observed taking a substantial amount of
- time. For the terabyte sort, our investigations show that we
- could save about 4 seconds out of the 8 that were spent on setting
- up the job.
- (\href{http://issues.apache.org/jira/browse/HADOOP-5795}{HADOOP-5795})
-
-\end{enumerate}
-
-\section{Results}
-
-Hadoop has made a lot of progress in the last year and we were able to
-run much lower latency jobs as well as much larger jobs. Note that in
-any large cluster and distributed application, there are a lot of
-moving pieces and thus we have seen a wide variation in execution
-times. As Hadoop evolves and becomes more graceful in the presence of
-hardware degradation and failure, this variation should smooth
-out. The best times for each of the listed sort sizes were:
-\\
-
-\begin{tabular}{| c | c | c | c | c | c |}
-\hline
-Bytes & Nodes & Maps & Reduces & Replication & Time \\
-\hline
-$5*10^{11}$ & 1406 & 8000 & 2600 & 1 & 59 seconds \\
-$10^{12}$ & 1460 & 8000 & 2700 & 1 & 62 seconds \\
-$10^{14}$ & 3452 & 190,000 & 10,000 & 2 & 173 minutes \\
-$10^{15}$ & 3658 & 80,000 & 20,000 & 2 & 975 minutes \\
-\hline
-\end{tabular}\\
-
-Within the rules for the 2009 Gray sort, our 500 GB sort set a new
-record for the minute sort and the 1PB sort set a new record of 1.03
-TB/minute. The 62 second terabyte sort would have set a new record,
-but the terabyte benchmark that we won last year has been
-retired. (Clearly the minute sort and terabyte sort are rapidly
-converging, and thus it is not a loss.) One piece of trivia is that
-only the petabyte dataset had any duplicate keys (40 of them).
-
-Because the smaller sorts needed lower latency and faster network, we
-only used part of the cluster for those runs. In particular, instead
-of our normal 5:1 over subscription between racks, we limited it to 16
-nodes in each rack for a 2:1 over subscription. The smaller runs can
-also use output replication of 1, because they only take minutes to
-run and run on smaller clusters, the likelihood of a node failing is
-fairly low. On the larger runs, failure is expected and thus
-replication of 2 is required. HDFS protects against data loss during
-rack failure by writing the second replica on a different rack and
-thus writing the second replica is relatively slow.
-
-We've included the timelines for the jobs counting from the job
-submission at the Job Tracker. The diagrams show the number of tasks
-running at each point in time. While maps only have a single phase,
-the reduces have three: \textbf{shuffle}, \textbf{merge}, and
-\textbf{reduce}. The shuffle is the transfer of the data from the
-maps. Merge doesn't happen in these benchmarks, because none of the
-reduces need multiple levels of merges. Finally, the reduce phase is
-where the final merge and writing to HDFS happens. I've also included
-a category named \textbf{waste} that represents task attempts that
-were running, but ended up either failing, or being killed (often as
-speculatively executed task attempts). The job logs and configuration
-for the four runs, which are the raw data for the charts, are
-available on
-\href{http://people.apache.org/~omalley/tera-2009/}{http://people.apache.org/~omalley/tera-2009/}.
-
-If you compare this years charts to last year's, you'll notice that
-tasks are launching much faster now. Last year we only launched one
-task per heartbeat, so it took 40 seconds to get all of the tasks
-launched. Now, Hadoop will fill up a Task Tracker in a single
-heartbeat. Reducing that job launch overhead is very important
-for getting runs under a minute.
-
-As with last year, we ran with significantly larger tasks than the
-defaults for Hadoop. Even with the new more aggressive shuffle,
-minimizing the number of transfers (maps * reduces) is very important
-to the performance of the job. Notice that in the petabyte sort, each
-map is processing 15 GB instead of the default 128 MB and each reduce
-is handling 50 GB. When we ran the petabyte with more typical values
-1.5 GB / map, it took 40 hours to finish. Therefore, to increase
-throughput, it makes sense to consider increasing the default block
-size, which translates into the default map size, to at least up to 1
-GB.
-
-\section{Comments on the Rule Changes}
-
-The group that runs the Gray Sort Benchmark made very substantial
-changes to the rules this year. The changes were not announced; but
-rather appeared on the website in March. We feel that it was too late
-to make rule changes and that the benchmark should have been changed
-next year. We'd also like to point out that while most of the changes to
-the data generator were positive, it was a poor choice to remove the
-skewed distribution of the keys. The previously skewed distribution
-required sampling of the input to pick good partition points between
-the reduces. The current dataset picks keys so completely random that
-sampling is counter productive and yields even less distributions between the
-reduces.
-
-\bibliographystyle{abbrv}
-\bibliography{tera}
-
-\begin{figure}[!p]
-\includegraphics[width=4.21in]{500GBTaskTime.png}
-\caption{500 GB sort tasks across time}\label{500GbTimeline}
-\end{figure}
-
-\begin{figure}[!p]
-\includegraphics[width=4.5in]{1TBTaskTime.png}
-\caption{1 TB sort tasks across time}\label{1TbTimeline}
-\end{figure}
-
-\begin{figure}[!p]
-\includegraphics[width=4.5in]{100TBTaskTime.png}
-\caption{100 TB sort tasks across time}\label{100TbTimeline}
-\end{figure}
-
-\begin{figure}[!p]
-\includegraphics[width=4.5in]{1PBTaskTime.png}
-\caption{1 PB sort tasks across time}\label{1PbTimeline}
-\end{figure}
-
-\end{document}
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib
deleted file mode 100644
index 50a797f..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib
+++ /dev/null
@@ -1,31 +0,0 @@
-% Licensed to the Apache Software Foundation (ASF) under one
-% or more contributor license agreements. See the NOTICE file
-% distributed with this work for additional information
-% regarding copyright ownership. The ASF licenses this file
-% to you under the Apache License, Version 2.0 (the
-% "License"); you may not use this file except in compliance
-% with the License. You may obtain a copy of the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS,
-% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-% See the License for the specific language governing permissions and
-% limitations under the License.
-@INPROCEEDINGS{mapreduce,
- AUTHOR = "Jeffery Dean and Sanjay Ghemawat",
- TITLE = "MapReduce: Simplified Data Processing on Large Clusters",
- BOOKTITLE = "Sixth Symposium on Operating System Design and Implementation",
- MONTH = "December",
- ADDRESS = "San Francisco, CA",
- YEAR = {2004} }
-
-@INPROCEEDINGS{gfs,
- AUTHOR = "Sanjay Ghemawat and Howard Gobioff and Shun-Tak Leung",
- TITLE = "The Google File System",
- BOOKTITLE = "19th Symposium on Operating Systems Principles",
- ORGANIZATION = "ACM",
- MONTH = "October",
- ADDRESS = "Lake George, NY",
- YEAR = {2003} }