You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/08/16 02:54:55 UTC

[07/10] TEZ-1055. Rename tez-mapreduce-examples to tez-examples (Hitesh Shah via bikas)

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java
deleted file mode 100644
index c9b51c7..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * This program uses map/reduce to just run a distributed job where there is
- * no interaction between the tasks and each task write a large unsorted
- * random binary sequence file of BytesWritable.
- * In order for this program to generate data for terasort with 10-byte keys
- * and 90-byte values, have the following config:
- * <xmp>
- * <?xml version="1.0"?>
- * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
- * <configuration>
- *   <property>
- *     <name>mapreduce.randomwriter.minkey</name>
- *     <value>10</value>
- *   </property>
- *   <property>
- *     <name>mapreduce.randomwriter.maxkey</name>
- *     <value>10</value>
- *   </property>
- *   <property>
- *     <name>mapreduce.randomwriter.minvalue</name>
- *     <value>90</value>
- *   </property>
- *   <property>
- *     <name>mapreduce.randomwriter.maxvalue</name>
- *     <value>90</value>
- *   </property>
- *   <property>
- *     <name>mapreduce.randomwriter.totalbytes</name>
- *     <value>1099511627776</value>
- *   </property>
- * </configuration></xmp>
- * 
- * Equivalently, {@link RandomWriter} also supports all the above options
- * and ones supported by {@link GenericOptionsParser} via the command-line.
- */
-public class RandomWriter extends Configured implements Tool {
-  public static final String TOTAL_BYTES = "mapreduce.randomwriter.totalbytes";
-  public static final String BYTES_PER_MAP = 
-    "mapreduce.randomwriter.bytespermap";
-  public static final String MAPS_PER_HOST = 
-    "mapreduce.randomwriter.mapsperhost";
-  public static final String MAX_VALUE = "mapreduce.randomwriter.maxvalue";
-  public static final String MIN_VALUE = "mapreduce.randomwriter.minvalue";
-  public static final String MIN_KEY = "mapreduce.randomwriter.minkey";
-  public static final String MAX_KEY = "mapreduce.randomwriter.maxkey";
-  
-  /**
-   * User counters
-   */
-  static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
-  
-  /**
-   * A custom input format that creates virtual inputs of a single string
-   * for each map.
-   */
-  static class RandomInputFormat extends InputFormat<Text, Text> {
-
-    /** 
-     * Generate the requested number of file splits, with the filename
-     * set to the filename of the output file.
-     */
-    public List<InputSplit> getSplits(JobContext job) throws IOException {
-      List<InputSplit> result = new ArrayList<InputSplit>();
-      Path outDir = FileOutputFormat.getOutputPath(job);
-      int numSplits = 
-            job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
-      for(int i=0; i < numSplits; ++i) {
-        result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
-                                  (String[])null));
-      }
-      return result;
-    }
-
-    /**
-     * Return a single record (filename, "") where the filename is taken from
-     * the file split.
-     */
-    static class RandomRecordReader extends RecordReader<Text, Text> {
-      Path name;
-      Text key = null;
-      Text value = new Text();
-      public RandomRecordReader(Path p) {
-        name = p;
-      }
-      
-      public void initialize(InputSplit split,
-                             TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    	  
-      }
-      
-      public boolean nextKeyValue() {
-        if (name != null) {
-          key = new Text();
-          key.set(name.getName());
-          name = null;
-          return true;
-        }
-        return false;
-      }
-      
-      public Text getCurrentKey() {
-        return key;
-      }
-      
-      public Text getCurrentValue() {
-        return value;
-      }
-      
-      public void close() {}
-
-      public float getProgress() {
-        return 0.0f;
-      }
-    }
-
-    public RecordReader<Text, Text> createRecordReader(InputSplit split,
-        TaskAttemptContext context) throws IOException, InterruptedException {
-      return new RandomRecordReader(((FileSplit) split).getPath());
-    }
-  }
-
-  static class RandomMapper extends Mapper<WritableComparable, Writable,
-                      BytesWritable, BytesWritable> {
-    
-    private long numBytesToWrite;
-    private int minKeySize;
-    private int keySizeRange;
-    private int minValueSize;
-    private int valueSizeRange;
-    private Random random = new Random();
-    private BytesWritable randomKey = new BytesWritable();
-    private BytesWritable randomValue = new BytesWritable();
-    
-    private void randomizeBytes(byte[] data, int offset, int length) {
-      for(int i=offset + length - 1; i >= offset; --i) {
-        data[i] = (byte) random.nextInt(256);
-      }
-    }
-    
-    /**
-     * Given an output filename, write a bunch of random records to it.
-     */
-    public void map(WritableComparable key, 
-                    Writable value,
-                    Context context) throws IOException,InterruptedException {
-      int itemCount = 0;
-      while (numBytesToWrite > 0) {
-        int keyLength = minKeySize + 
-          (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
-        randomKey.setSize(keyLength);
-        randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
-        int valueLength = minValueSize +
-          (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
-        randomValue.setSize(valueLength);
-        randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
-        context.write(randomKey, randomValue);
-        numBytesToWrite -= keyLength + valueLength;
-        context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
-        context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
-        if (++itemCount % 200 == 0) {
-          context.setStatus("wrote record " + itemCount + ". " + 
-                             numBytesToWrite + " bytes left.");
-        }
-      }
-      context.setStatus("done with " + itemCount + " records.");
-    }
-    
-    /**
-     * Save the values out of the configuaration that we need to write
-     * the data.
-     */
-    @Override
-    public void setup(Context context) {
-      Configuration conf = context.getConfiguration();
-      numBytesToWrite = conf.getLong(BYTES_PER_MAP,
-                                    1*1024*1024*1024);
-      minKeySize = conf.getInt(MIN_KEY, 10);
-      keySizeRange = 
-        conf.getInt(MAX_KEY, 1000) - minKeySize;
-      minValueSize = conf.getInt(MIN_VALUE, 0);
-      valueSizeRange = 
-        conf.getInt(MAX_VALUE, 20000) - minValueSize;
-    }
-  }
-  
-  /**
-   * This is the main routine for launching a distributed random write job.
-   * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
-   * The reduce doesn't do anything.
-   * 
-   * @throws IOException 
-   */
-  public int run(String[] args) throws Exception {    
-    if (args.length == 0) {
-      System.out.println("Usage: writer <out-dir>");
-      ToolRunner.printGenericCommandUsage(System.out);
-      return 2;
-    }
-    
-    Path outDir = new Path(args[0]);
-    Configuration conf = getConf();
-    JobClient client = new JobClient(conf);
-    ClusterStatus cluster = client.getClusterStatus();
-    int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
-    long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
-                                             1*1024*1024*1024);
-    if (numBytesToWritePerMap == 0) {
-      System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0");
-      return -2;
-    }
-    long totalBytesToWrite = conf.getLong(TOTAL_BYTES, 
-         numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
-    int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
-    if (numMaps == 0 && totalBytesToWrite > 0) {
-      numMaps = 1;
-      conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
-    }
-    conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
-
-    Job job = new Job(conf);
-    
-    job.setJarByClass(RandomWriter.class);
-    job.setJobName("random-writer");
-    FileOutputFormat.setOutputPath(job, outDir);
-    job.setOutputKeyClass(BytesWritable.class);
-    job.setOutputValueClass(BytesWritable.class);
-    job.setInputFormatClass(RandomInputFormat.class);
-    job.setMapperClass(RandomMapper.class);        
-    job.setReducerClass(Reducer.class);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    
-    System.out.println("Running " + numMaps + " maps.");
-    
-    // reducer NONE
-    job.setNumReduceTasks(0);
-    
-    Date startTime = new Date();
-    System.out.println("Job started: " + startTime);
-    int ret = job.waitForCompletion(true) ? 0 : 1;
-    Date endTime = new Date();
-    System.out.println("Job ended: " + endTime);
-    System.out.println("The job took " + 
-                       (endTime.getTime() - startTime.getTime()) /1000 + 
-                       " seconds.");
-    
-    return ret;
-  }
-  
-  public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);
-    System.exit(res);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/SecondarySort.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/SecondarySort.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/SecondarySort.java
deleted file mode 100644
index cdae905..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/SecondarySort.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * This is an example Hadoop Map/Reduce application.
- * It reads the text input files that must contain two integers per a line.
- * The output is sorted by the first and second number and grouped on the 
- * first number.
- *
- * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort
- *            <i>in-dir</i> <i>out-dir</i> 
- */
-public class SecondarySort extends Configured implements Tool {
- 
-  /**
-   * Define a pair of integers that are writable.
-   * They are serialized in a byte comparable format.
-   */
-  public static class IntPair 
-                      implements WritableComparable<IntPair> {
-    private int first = 0;
-    private int second = 0;
-    
-    /**
-     * Set the left and right values.
-     */
-    public void set(int left, int right) {
-      first = left;
-      second = right;
-    }
-    public int getFirst() {
-      return first;
-    }
-    public int getSecond() {
-      return second;
-    }
-    /**
-     * Read the two integers. 
-     * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
-     */
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      first = in.readInt() + Integer.MIN_VALUE;
-      second = in.readInt() + Integer.MIN_VALUE;
-    }
-    @Override
-    public void write(DataOutput out) throws IOException {
-      out.writeInt(first - Integer.MIN_VALUE);
-      out.writeInt(second - Integer.MIN_VALUE);
-    }
-    @Override
-    public int hashCode() {
-      return first * 157 + second;
-    }
-    @Override
-    public boolean equals(Object right) {
-      if (right instanceof IntPair) {
-        IntPair r = (IntPair) right;
-        return r.first == first && r.second == second;
-      } else {
-        return false;
-      }
-    }
-    /** A Comparator that compares serialized IntPair. */ 
-    public static class Comparator extends WritableComparator {
-      public Comparator() {
-        super(IntPair.class);
-      }
-
-      public int compare(byte[] b1, int s1, int l1,
-                         byte[] b2, int s2, int l2) {
-        return compareBytes(b1, s1, l1, b2, s2, l2);
-      }
-    }
-
-    static {                                        // register this comparator
-      WritableComparator.define(IntPair.class, new Comparator());
-    }
-
-    @Override
-    public int compareTo(IntPair o) {
-      if (first != o.first) {
-        return first < o.first ? -1 : 1;
-      } else if (second != o.second) {
-        return second < o.second ? -1 : 1;
-      } else {
-        return 0;
-      }
-    }
-  }
-  
-  /**
-   * Partition based on the first part of the pair.
-   */
-  public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
-    @Override
-    public int getPartition(IntPair key, IntWritable value, 
-                            int numPartitions) {
-      return Math.abs(key.getFirst() * 127) % numPartitions;
-    }
-  }
-
-  /**
-   * Compare only the first part of the pair, so that reduce is called once
-   * for each value of the first part.
-   */
-  public static class FirstGroupingComparator 
-                implements RawComparator<IntPair> {
-    @Override
-    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-      return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, 
-                                             b2, s2, Integer.SIZE/8);
-    }
-
-    @Override
-    public int compare(IntPair o1, IntPair o2) {
-      int l = o1.getFirst();
-      int r = o2.getFirst();
-      return l == r ? 0 : (l < r ? -1 : 1);
-    }
-  }
-
-  /**
-   * Read two integers from each line and generate a key, value pair
-   * as ((left, right), right).
-   */
-  public static class MapClass 
-         extends Mapper<LongWritable, Text, IntPair, IntWritable> {
-    
-    private final IntPair key = new IntPair();
-    private final IntWritable value = new IntWritable();
-    
-    @Override
-    public void map(LongWritable inKey, Text inValue, 
-                    Context context) throws IOException, InterruptedException {
-      StringTokenizer itr = new StringTokenizer(inValue.toString());
-      int left = 0;
-      int right = 0;
-      if (itr.hasMoreTokens()) {
-        left = Integer.parseInt(itr.nextToken());
-        if (itr.hasMoreTokens()) {
-          right = Integer.parseInt(itr.nextToken());
-        }
-        key.set(left, right);
-        value.set(right);
-        context.write(key, value);
-      }
-    }
-  }
-  
-  /**
-   * A reducer class that just emits the sum of the input values.
-   */
-  public static class Reduce 
-         extends Reducer<IntPair, IntWritable, Text, IntWritable> {
-    private static final Text SEPARATOR = 
-      new Text("------------------------------------------------");
-    private final Text first = new Text();
-    
-    @Override
-    public void reduce(IntPair key, Iterable<IntWritable> values,
-                       Context context
-                       ) throws IOException, InterruptedException {
-      context.write(SEPARATOR, null);
-      first.set(Integer.toString(key.getFirst()));
-      for(IntWritable value: values) {
-        context.write(first, value);
-      }
-    }
-  }
-  
-  @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    if (otherArgs.length != 2) {
-      System.err.println("Usage: secondarysort <in> <out>");
-      ToolRunner.printGenericCommandUsage(System.out);
-      System.exit(2);
-    }
-    Job job = new Job(conf, "secondary sort");
-    job.setJarByClass(SecondarySort.class);
-    job.setMapperClass(MapClass.class);
-    job.setReducerClass(Reduce.class);
-
-    // group and partition by the first int in the pair
-    job.setPartitionerClass(FirstPartitioner.class);
-    job.setGroupingComparatorClass(FirstGroupingComparator.class);
-
-    // the map output is IntPair, IntWritable
-    job.setMapOutputKeyClass(IntPair.class);
-    job.setMapOutputValueClass(IntWritable.class);
-
-    // the reduce output is Text, IntWritable
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(IntWritable.class);
-    
-    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-    return job.waitForCompletion(true) ? 0 : 1;
-  }
-
-  public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(), new SecondarySort(), args);
-    System.exit(res);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Sort.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Sort.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Sort.java
deleted file mode 100644
index 4eb5484..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/Sort.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.net.URI;
-import java.util.*;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * This is the trivial map/reduce program that does absolutely nothing
- * other than use the framework to fragment and sort the input values.
- *
- * To run: bin/hadoop jar build/hadoop-examples.jar sort
- *            [-r <i>reduces</i>]
- *            [-inFormat <i>input format class</i>] 
- *            [-outFormat <i>output format class</i>] 
- *            [-outKey <i>output key class</i>] 
- *            [-outValue <i>output value class</i>] 
- *            [-totalOrder <i>pcnt</i> <i>num samples</i> <i>max splits</i>]
- *            <i>in-dir</i> <i>out-dir</i> 
- */
-public class Sort<K,V> extends Configured implements Tool {
-  public static final String REDUCES_PER_HOST = 
-    "mapreduce.sort.reducesperhost";
-  private Job job = null;
-
-  static int printUsage() {
-    System.out.println("sort [-r <reduces>] " +
-                       "[-inFormat <input format class>] " +
-                       "[-outFormat <output format class>] " + 
-                       "[-outKey <output key class>] " +
-                       "[-outValue <output value class>] " +
-                       "[-totalOrder <pcnt> <num samples> <max splits>] " +
-                       "<input> <output>");
-    ToolRunner.printGenericCommandUsage(System.out);
-    return 2;
-  }
-
-  /**
-   * The main driver for sort program.
-   * Invoke this method to submit the map/reduce job.
-   * @throws IOException When there is communication problems with the 
-   *                     job tracker.
-   */
-  public int run(String[] args) throws Exception {
-
-    Configuration conf = getConf();
-    JobClient client = new JobClient(conf);
-    ClusterStatus cluster = client.getClusterStatus();
-    int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
-    String sort_reduces = conf.get(REDUCES_PER_HOST);
-    if (sort_reduces != null) {
-       num_reduces = cluster.getTaskTrackers() * 
-                       Integer.parseInt(sort_reduces);
-    }
-    Class<? extends InputFormat> inputFormatClass = 
-      SequenceFileInputFormat.class;
-    Class<? extends OutputFormat> outputFormatClass = 
-      SequenceFileOutputFormat.class;
-    Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
-    Class<? extends Writable> outputValueClass = BytesWritable.class;
-    List<String> otherArgs = new ArrayList<String>();
-    InputSampler.Sampler<K,V> sampler = null;
-    for(int i=0; i < args.length; ++i) {
-      try {
-        if ("-r".equals(args[i])) {
-          num_reduces = Integer.parseInt(args[++i]);
-        } else if ("-inFormat".equals(args[i])) {
-          inputFormatClass = 
-            Class.forName(args[++i]).asSubclass(InputFormat.class);
-        } else if ("-outFormat".equals(args[i])) {
-          outputFormatClass = 
-            Class.forName(args[++i]).asSubclass(OutputFormat.class);
-        } else if ("-outKey".equals(args[i])) {
-          outputKeyClass = 
-            Class.forName(args[++i]).asSubclass(WritableComparable.class);
-        } else if ("-outValue".equals(args[i])) {
-          outputValueClass = 
-            Class.forName(args[++i]).asSubclass(Writable.class);
-        } else if ("-totalOrder".equals(args[i])) {
-          double pcnt = Double.parseDouble(args[++i]);
-          int numSamples = Integer.parseInt(args[++i]);
-          int maxSplits = Integer.parseInt(args[++i]);
-          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
-          sampler =
-            new InputSampler.RandomSampler<K,V>(pcnt, numSamples, maxSplits);
-        } else {
-          otherArgs.add(args[i]);
-        }
-      } catch (NumberFormatException except) {
-        System.out.println("ERROR: Integer expected instead of " + args[i]);
-        return printUsage();
-      } catch (ArrayIndexOutOfBoundsException except) {
-        System.out.println("ERROR: Required parameter missing from " +
-            args[i-1]);
-        return printUsage(); // exits
-      }
-    }
-    // Set user-supplied (possibly default) job configs
-    job = new Job(conf);
-    job.setJobName("sorter");
-    job.setJarByClass(Sort.class);
-
-    job.setMapperClass(Mapper.class);        
-    job.setReducerClass(Reducer.class);
-
-    job.setNumReduceTasks(num_reduces);
-
-    job.setInputFormatClass(inputFormatClass);
-    job.setOutputFormatClass(outputFormatClass);
-
-    job.setOutputKeyClass(outputKeyClass);
-    job.setOutputValueClass(outputValueClass);
-
-    // Make sure there are exactly 2 parameters left.
-    if (otherArgs.size() != 2) {
-      System.out.println("ERROR: Wrong number of parameters: " +
-          otherArgs.size() + " instead of 2.");
-      return printUsage();
-    }
-    FileInputFormat.setInputPaths(job, otherArgs.get(0));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
-    
-    if (sampler != null) {
-      System.out.println("Sampling input to effect total-order sort...");
-      job.setPartitionerClass(TotalOrderPartitioner.class);
-      Path inputDir = FileInputFormat.getInputPaths(job)[0];
-      inputDir = inputDir.makeQualified(inputDir.getFileSystem(conf));
-      Path partitionFile = new Path(inputDir, "_sortPartitioning");
-      TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
-      InputSampler.<K,V>writePartitionFile(job, sampler);
-      URI partitionUri = new URI(partitionFile.toString() +
-                                 "#" + "_sortPartitioning");
-      DistributedCache.addCacheFile(partitionUri, conf);
-    }
-
-    System.out.println("Running on " +
-        cluster.getTaskTrackers() +
-        " nodes to sort from " + 
-        FileInputFormat.getInputPaths(job)[0] + " into " +
-        FileOutputFormat.getOutputPath(job) +
-        " with " + num_reduces + " reduces.");
-    Date startTime = new Date();
-    System.out.println("Job started: " + startTime);
-    int ret = job.waitForCompletion(true) ? 0 : 1;
-    Date end_time = new Date();
-    System.out.println("Job ended: " + end_time);
-    System.out.println("The job took " + 
-        (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
-    return ret;
-  }
-
-
-
-  public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(), new Sort(), args);
-    System.exit(res);
-  }
-
-  /**
-   * Get the last job that was run using this instance.
-   * @return the results of the last job that was run
-   */
-  public Job getResult() {
-    return job;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
deleted file mode 100644
index 72c14fc..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.TreeMap;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.PreWarmVertex;
-import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.output.MROutputLegacy;
-import org.apache.tez.mapreduce.processor.map.MapProcessor;
-import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * An MRR job built on top of word count to return words sorted by
- * their frequency of occurrence.
- *
- * Use -DUSE_TEZ_SESSION=true to run jobs in a session mode.
- * If multiple input/outputs are provided, this job will process each pair
- * as a separate DAG in a sequential manner.
- * Use -DINTER_JOB_SLEEP_INTERVAL=<N> where N is the sleep interval in seconds
- * between the sequential DAGs.
- */
-public class TestOrderedWordCount extends Configured implements Tool {
-
-  private static Log LOG = LogFactory.getLog(TestOrderedWordCount.class);
-
-  public static class TokenizerMapper
-       extends Mapper<Object, Text, Text, IntWritable>{
-
-    private final static IntWritable one = new IntWritable(1);
-    private Text word = new Text();
-
-    public void map(Object key, Text value, Context context
-                    ) throws IOException, InterruptedException {
-      StringTokenizer itr = new StringTokenizer(value.toString());
-      while (itr.hasMoreTokens()) {
-        word.set(itr.nextToken());
-        context.write(word, one);
-      }
-    }
-  }
-
-  public static class IntSumReducer
-       extends Reducer<Text,IntWritable,IntWritable, Text> {
-    private IntWritable result = new IntWritable();
-
-    public void reduce(Text key, Iterable<IntWritable> values,
-                       Context context
-                       ) throws IOException, InterruptedException {
-      int sum = 0;
-      for (IntWritable val : values) {
-        sum += val.get();
-      }
-      result.set(sum);
-      context.write(result, key);
-    }
-  }
-
-  /**
-   * Shuffle ensures ordering based on count of employees per department
-   * hence the final reducer is a no-op and just emits the department name
-   * with the employee count per department.
-   */
-  public static class MyOrderByNoOpReducer
-      extends Reducer<IntWritable, Text, Text, IntWritable> {
-
-    public void reduce(IntWritable key, Iterable<Text> values,
-        Context context
-        ) throws IOException, InterruptedException {
-      for (Text word : values) {
-        context.write(word, key);
-      }
-    }
-  }
-
-  private Credentials credentials = new Credentials();
-
-  @VisibleForTesting
-  public DAG createDAG(FileSystem fs, Configuration conf,
-      Map<String, LocalResource> commonLocalResources, Path stagingDir,
-      int dagIndex, String inputPath, String outputPath,
-      boolean generateSplitsInClient) throws Exception {
-
-    Configuration mapStageConf = new JobConf(conf);
-    mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
-        TokenizerMapper.class.getName());
-
-    MRHelpers.translateMRConfToTez(mapStageConf);
-
-    Configuration iReduceStageConf = new JobConf(conf);
-    // TODO replace with auto-reduce parallelism
-    iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2);
-    iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
-        IntSumReducer.class.getName());
-    iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
-    iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
-        IntWritable.class.getName());
-    iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
-    MRHelpers.translateMRConfToTez(iReduceStageConf);
-
-    Configuration finalReduceConf = new JobConf(conf);
-    finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
-    finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
-        MyOrderByNoOpReducer.class.getName());
-    finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
-    finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
-    MRHelpers.translateMRConfToTez(finalReduceConf);
-
-    MRHelpers.configureMRApiUsage(mapStageConf);
-    MRHelpers.configureMRApiUsage(iReduceStageConf);
-    MRHelpers.configureMRApiUsage(finalReduceConf);
-
-    List<Vertex> vertices = new ArrayList<Vertex>();
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
-    mapStageConf.writeXml(outputStream);
-    String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
-    DataSourceDescriptor dsd;
-    if (generateSplitsInClient) {
-      mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
-          TextInputFormat.class.getName());
-      mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
-      mapStageConf.setBoolean("mapred.mapper.new-api", true);
-      dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(mapStageConf, stagingDir, true);
-    } else {
-      dsd = MRInputLegacy.createConfigurer(mapStageConf, TextInputFormat.class, inputPath).create();
-    }
-
-    Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
-        MapProcessor.class.getName()).setUserPayload(
-        TezUtils.createUserPayloadFromConf(mapStageConf))
-        .setHistoryText(mapStageHistoryText)).setTaskLocalFiles(commonLocalResources);
-    mapVertex.addDataSource("MRInput", dsd);
-    vertices.add(mapVertex);
-
-    ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
-    iReduceStageConf.writeXml(iROutputStream);
-    String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
-    Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
-        ReduceProcessor.class.getName())
-            .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
-            .setHistoryText(iReduceStageHistoryText), 2);
-    ivertex.setTaskLocalFiles(commonLocalResources);
-    vertices.add(ivertex);
-
-    ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
-    finalReduceConf.writeXml(finalReduceOutputStream);
-    String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
-    UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
-    Vertex finalReduceVertex = new Vertex("finalreduce",
-        new ProcessorDescriptor(
-            ReduceProcessor.class.getName())
-                .setUserPayload(finalReducePayload)
-                .setHistoryText(finalReduceStageHistoryText), 1);
-    finalReduceVertex.setTaskLocalFiles(commonLocalResources);
-    finalReduceVertex.addDataSink("MROutput",
-        MROutputLegacy.createConfigurer(finalReduceConf, TextOutputFormat.class, outputPath)
-            .create());
-    vertices.add(finalReduceVertex);
-
-    DAG dag = new DAG("OrderedWordCount" + dagIndex);
-    for (int i = 0; i < vertices.size(); ++i) {
-      dag.addVertex(vertices.get(i));
-    }
-
-    OrderedPartitionedKVEdgeConfigurer edgeConf1 = OrderedPartitionedKVEdgeConfigurer
-        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
-            HashPartitioner.class.getName()).setFromConfiguration(conf)
-	    .configureInput().useLegacyInput().done().build();
-    dag.addEdge(
-        new Edge(dag.getVertex("initialmap"), dag.getVertex("intermediate_reducer"),
-            edgeConf1.createDefaultEdgeProperty()));
-
-    OrderedPartitionedKVEdgeConfigurer edgeConf2 = OrderedPartitionedKVEdgeConfigurer
-        .newBuilder(IntWritable.class.getName(), Text.class.getName(),
-            HashPartitioner.class.getName()).setFromConfiguration(conf)
-            .configureInput().useLegacyInput().done().build();
-    dag.addEdge(
-        new Edge(dag.getVertex("intermediate_reducer"), dag.getVertex("finalreduce"),
-            edgeConf2.createDefaultEdgeProperty()));
-
-    return dag;
-  }
-
-  private static void printUsage() {
-    String options = " [-generateSplitsInClient true/<false>]";
-    System.err.println("Usage: testorderedwordcount <in> <out>" + options);
-    System.err.println("Usage (In Session Mode):"
-        + " testorderedwordcount <in1> <out1> ... <inN> <outN>" + options);
-    ToolRunner.printGenericCommandUsage(System.err);
-  }
-
-
-  @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-
-    boolean generateSplitsInClient;
-
-    SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
-    try {
-      generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
-      otherArgs = splitCmdLineParser.getRemainingArgs();
-    } catch (ParseException e1) {
-      System.err.println("Invalid options");
-      printUsage();
-      return 2;
-    }
-
-    boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
-    long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0)
-        * 1000;
-
-    boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false);
-
-    if (((otherArgs.length%2) != 0)
-        || (!useTezSession && otherArgs.length != 2)) {
-      printUsage();
-      return 2;
-    }
-
-    List<String> inputPaths = new ArrayList<String>();
-    List<String> outputPaths = new ArrayList<String>();
-
-    for (int i = 0; i < otherArgs.length; i+=2) {
-      inputPaths.add(otherArgs[i]);
-      outputPaths.add(otherArgs[i+1]);
-    }
-
-    UserGroupInformation.setConfiguration(conf);
-
-    TezConfiguration tezConf = new TezConfiguration(conf);
-    TestOrderedWordCount instance = new TestOrderedWordCount();
-
-    FileSystem fs = FileSystem.get(conf);
-
-    String stagingDirStr =  conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
-            TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + Path.SEPARATOR + 
-            Long.toString(System.currentTimeMillis());
-    Path stagingDir = new Path(stagingDirStr);
-    FileSystem pathFs = stagingDir.getFileSystem(tezConf);
-    pathFs.mkdirs(new Path(stagingDirStr));
-
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
-    stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
-    
-    TokenCache.obtainTokensForNamenodes(instance.credentials, new Path[] {stagingDir}, conf);
-    TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
-
-    // No need to add jar containing this class as assumed to be part of
-    // the tez jars.
-
-    // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
-    // is the same filesystem as the one used for Input/Output.
-    
-    if (useTezSession) {
-      LOG.info("Creating Tez Session");
-      tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
-    } else {
-      tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
-    }
-    TezClient tezSession = new TezClient("OrderedWordCountSession", tezConf,
-        null, instance.credentials);
-    tezSession.start();
-
-    DAGStatus dagStatus = null;
-    DAGClient dagClient = null;
-    String[] vNames = { "initialmap", "intermediate_reducer",
-      "finalreduce" };
-
-    Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-    try {
-      for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) {
-        if (dagIndex != 1
-            && interJobSleepTimeout > 0) {
-          try {
-            LOG.info("Sleeping between jobs, sleepInterval="
-                + (interJobSleepTimeout/1000));
-            Thread.sleep(interJobSleepTimeout);
-          } catch (InterruptedException e) {
-            LOG.info("Main thread interrupted. Breaking out of job loop");
-            break;
-          }
-        }
-
-        String inputPath = inputPaths.get(dagIndex-1);
-        String outputPath = outputPaths.get(dagIndex-1);
-
-        if (fs.exists(new Path(outputPath))) {
-          throw new FileAlreadyExistsException("Output directory "
-              + outputPath + " already exists");
-        }
-        LOG.info("Running OrderedWordCount DAG"
-            + ", dagIndex=" + dagIndex
-            + ", inputPath=" + inputPath
-            + ", outputPath=" + outputPath);
-
-        Map<String, LocalResource> localResources =
-          new TreeMap<String, LocalResource>();
-        
-        DAG dag = instance.createDAG(fs, conf, localResources,
-            stagingDir, dagIndex, inputPath, outputPath,
-            generateSplitsInClient);
-
-        boolean doPreWarm = dagIndex == 1 && useTezSession
-            && conf.getBoolean("PRE_WARM_SESSION", true);
-        int preWarmNumContainers = 0;
-        if (doPreWarm) {
-          preWarmNumContainers = conf.getInt("PRE_WARM_NUM_CONTAINERS", 0);
-          if (preWarmNumContainers <= 0) {
-            doPreWarm = false;
-          }
-        }
-        if (doPreWarm) {
-          LOG.info("Pre-warming Session");
-          PreWarmVertex preWarmVertex = new PreWarmVertex("PreWarm", preWarmNumContainers, dag
-              .getVertex("initialmap").getTaskResource());
-          preWarmVertex.setTaskLocalFiles(dag.getVertex("initialmap").getTaskLocalFiles());
-          preWarmVertex.setTaskEnvironment(dag.getVertex("initialmap").getTaskEnvironment());
-          preWarmVertex.setTaskLaunchCmdOpts(dag.getVertex("initialmap").getTaskLaunchCmdOpts());
-          
-          tezSession.preWarm(preWarmVertex);
-        }
-
-        if (useTezSession) {
-          LOG.info("Waiting for TezSession to get into ready state");
-          waitForTezSessionReady(tezSession);
-          LOG.info("Submitting DAG to Tez Session, dagIndex=" + dagIndex);
-          dagClient = tezSession.submitDAG(dag);
-          LOG.info("Submitted DAG to Tez Session, dagIndex=" + dagIndex);
-        } else {
-          LOG.info("Submitting DAG as a new Tez Application");
-          dagClient = tezSession.submitDAG(dag);
-        }
-
-        while (true) {
-          dagStatus = dagClient.getDAGStatus(statusGetOpts);
-          if (dagStatus.getState() == DAGStatus.State.RUNNING ||
-              dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
-              dagStatus.getState() == DAGStatus.State.FAILED ||
-              dagStatus.getState() == DAGStatus.State.KILLED ||
-              dagStatus.getState() == DAGStatus.State.ERROR) {
-            break;
-          }
-          try {
-            Thread.sleep(500);
-          } catch (InterruptedException e) {
-            // continue;
-          }
-        }
-
-
-        while (dagStatus.getState() != DAGStatus.State.SUCCEEDED &&
-            dagStatus.getState() != DAGStatus.State.FAILED &&
-            dagStatus.getState() != DAGStatus.State.KILLED &&
-            dagStatus.getState() != DAGStatus.State.ERROR) {
-          if (dagStatus.getState() == DAGStatus.State.RUNNING) {
-            ExampleDriver.printDAGStatus(dagClient, vNames);
-          }
-          try {
-            try {
-              Thread.sleep(1000);
-            } catch (InterruptedException e) {
-              // continue;
-            }
-            dagStatus = dagClient.getDAGStatus(statusGetOpts);
-          } catch (TezException e) {
-            LOG.fatal("Failed to get application progress. Exiting");
-            return -1;
-          }
-        }
-        ExampleDriver.printDAGStatus(dagClient, vNames,
-            true, true);
-        LOG.info("DAG " + dagIndex + " completed. "
-            + "FinalState=" + dagStatus.getState());
-        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-          LOG.info("DAG " + dagIndex + " diagnostics: "
-            + dagStatus.getDiagnostics());
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("Error occurred when submitting/running DAGs", e);
-      throw e;
-    } finally {
-      if (!retainStagingDir) {
-        pathFs.delete(stagingDir, true);
-      }
-      LOG.info("Shutting down session");
-      tezSession.stop();
-    }
-
-    if (!useTezSession) {
-      ExampleDriver.printDAGStatus(dagClient, vNames);
-      LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
-    }
-    return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
-  }
-
-  private static void waitForTezSessionReady(TezClient tezSession)
-    throws IOException, TezException, InterruptedException {
-    tezSession.waitTillReady();
-  }
-
-  public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(), new TestOrderedWordCount(), args);
-    System.exit(res);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
deleted file mode 100644
index a9b23e0..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.mapreduce.examples;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DataSinkDescriptor;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.GroupInputEdge;
-import org.apache.tez.dag.api.VertexGroup;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.input.MRInput;
-import org.apache.tez.mapreduce.input.MRInput.MRInputConfigurer;
-import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.Output;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
-import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-public class UnionExample {
-
-  public static class TokenProcessor extends SimpleMRProcessor {
-    IntWritable one = new IntWritable(1);
-    Text word = new Text();
-
-    public TokenProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void run() throws Exception {
-      Preconditions.checkArgument(getInputs().size() == 1);
-      boolean inUnion = true;
-      if (getContext().getTaskVertexName().equals("map3")) {
-        inUnion = false;
-      }
-      Preconditions.checkArgument(getOutputs().size() == (inUnion ? 2 : 1));
-      Preconditions.checkArgument(getOutputs().containsKey("checker"));
-      MRInput input = (MRInput) getInputs().values().iterator().next();
-      KeyValueReader kvReader = input.getReader();
-      Output output =  getOutputs().get("checker");
-      KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
-      MROutput parts = null;
-      KeyValueWriter partsWriter = null;
-      if (inUnion) {
-        parts = (MROutput) getOutputs().get("parts");
-        partsWriter = parts.getWriter();
-      }
-      while (kvReader.next()) {
-        StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
-        while (itr.hasMoreTokens()) {
-          word.set(itr.nextToken());
-          kvWriter.write(word, one);
-          if (inUnion) {
-            partsWriter.write(word, one);
-          }
-        }
-      }
-    }
-
-  }
-
-  public static class UnionProcessor extends SimpleMRProcessor {
-    IntWritable one = new IntWritable(1);
-
-    public UnionProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void run() throws Exception {
-      Preconditions.checkArgument(getInputs().size() == 2);
-      Preconditions.checkArgument(getOutputs().size() == 2);
-      MROutput out = (MROutput) getOutputs().get("union");
-      MROutput allParts = (MROutput) getOutputs().get("all-parts");
-      KeyValueWriter kvWriter = out.getWriter();
-      KeyValueWriter partsWriter = allParts.getWriter();
-      Map<String, AtomicInteger> unionKv = Maps.newHashMap();
-      LogicalInput union = getInputs().get("union");
-      KeyValuesReader kvReader = (KeyValuesReader) union.getReader();
-      while (kvReader.next()) {
-        String word = ((Text) kvReader.getCurrentKey()).toString();
-        IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
-        for (int i = 0; i < intVal.get(); ++i) {
-          partsWriter.write(word, one);
-        }
-        AtomicInteger value = unionKv.get(word);
-        if (value == null) {
-          unionKv.put(word, new AtomicInteger(intVal.get()));
-        } else {
-          value.addAndGet(intVal.get());
-        }
-      }
-      LogicalInput map3 = getInputs().get("map3");
-      kvReader = (KeyValuesReader) map3.getReader();
-      while (kvReader.next()) {
-        String word = ((Text) kvReader.getCurrentKey()).toString();
-        IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
-        AtomicInteger value = unionKv.get(word);
-        if (value == null) {
-          throw new TezUncheckedException("Expected to exist: " + word);
-        } else {
-          value.getAndAdd(intVal.get() * -2);
-        }
-      }
-      for (AtomicInteger value : unionKv.values()) {
-        if (value.get() != 0) {
-          throw new TezUncheckedException("Unexpected non-zero value");
-        }
-      }
-      kvWriter.write("Union", new IntWritable(unionKv.size()));
-    }
-
-  }
-
-  private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
-      Map<String, LocalResource> localResources, Path stagingDir,
-      String inputPath, String outputPath) throws IOException {
-    DAG dag = new DAG("UnionExample");
-    
-    int numMaps = -1;
-    Configuration inputConf = new Configuration(tezConf);
-    MRInputConfigurer configurer = MRInput.createConfigurer(inputConf, TextInputFormat.class,
-        inputPath);
-    DataSourceDescriptor dataSource = configurer.generateSplitsInAM(false).create();
-
-    Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
-        TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
-
-    Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
-        TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
-
-    Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
-        TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
-
-    Vertex checkerVertex = new Vertex("checker", new ProcessorDescriptor(
-        UnionProcessor.class.getName()), 1);
-
-    Configuration outputConf = new Configuration(tezConf);
-    DataSinkDescriptor od = MROutput.createConfigurer(outputConf,
-        TextOutputFormat.class, outputPath).create();
-    checkerVertex.addDataSink("union", od);
-    
-
-    Configuration allPartsConf = new Configuration(tezConf);
-    DataSinkDescriptor od2 = MROutput.createConfigurer(allPartsConf,
-        TextOutputFormat.class, outputPath + "-all-parts").create();
-    checkerVertex.addDataSink("all-parts", od2);
-
-    Configuration partsConf = new Configuration(tezConf);    
-    DataSinkDescriptor od1 = MROutput.createConfigurer(partsConf,
-        TextOutputFormat.class, outputPath + "-parts").create();
-    VertexGroup unionVertex = dag.createVertexGroup("union", mapVertex1, mapVertex2);
-    unionVertex.addDataSink("parts", od1);
-
-    OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
-        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
-            HashPartitioner.class.getName()).build();
-
-    dag.addVertex(mapVertex1)
-        .addVertex(mapVertex2)
-        .addVertex(mapVertex3)
-        .addVertex(checkerVertex)
-        .addEdge(
-            new Edge(mapVertex3, checkerVertex, edgeConf.createDefaultEdgeProperty()))
-        .addEdge(
-            new GroupInputEdge(unionVertex, checkerVertex, edgeConf.createDefaultEdgeProperty(),
-                new InputDescriptor(
-                    ConcatenatedMergedKeyValuesInput.class.getName())));
-    return dag;  
-  }
-
-  private static void printUsage() {
-    System.err.println("Usage: " + " unionexample <in1> <out1>");
-  }
-
-  public boolean run(String inputPath, String outputPath, Configuration conf) throws Exception {
-    System.out.println("Running UnionExample");
-    // conf and UGI
-    TezConfiguration tezConf;
-    if (conf != null) {
-      tezConf = new TezConfiguration(conf);
-    } else {
-      tezConf = new TezConfiguration();
-    }
-    UserGroupInformation.setConfiguration(tezConf);
-    String user = UserGroupInformation.getCurrentUser().getShortUserName();
-
-    // staging dir
-    FileSystem fs = FileSystem.get(tezConf);
-    String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
-        + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
-        + Path.SEPARATOR + Long.toString(System.currentTimeMillis());    
-    Path stagingDir = new Path(stagingDirStr);
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
-    stagingDir = fs.makeQualified(stagingDir);
-    
-
-    // No need to add jar containing this class as assumed to be part of
-    // the tez jars.
-
-    // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
-    // is the same filesystem as the one used for Input/Output.
-    
-    TezClient tezSession = new TezClient("UnionExampleSession", tezConf);
-    tezSession.start();
-
-    DAGClient dagClient = null;
-
-    try {
-        if (fs.exists(new Path(outputPath))) {
-          throw new FileAlreadyExistsException("Output directory "
-              + outputPath + " already exists");
-        }
-        
-        Map<String, LocalResource> localResources =
-          new TreeMap<String, LocalResource>();
-        
-        DAG dag = createDAG(fs, tezConf, localResources,
-            stagingDir, inputPath, outputPath);
-
-        tezSession.waitTillReady();
-        dagClient = tezSession.submitDAG(dag);
-
-        // monitoring
-        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(EnumSet.of(StatusGetOpts.GET_COUNTERS));
-        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-          System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
-          return false;
-        }
-        return true;
-    } finally {
-      fs.delete(stagingDir, true);
-      tezSession.stop();
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    if (args.length != 2) {
-      printUsage();
-      System.exit(2);
-    }
-    UnionExample job = new UnionExample();
-    job.run(args[0], args[1], null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
deleted file mode 100644
index 93ec860..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples.helpers;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-import com.google.common.base.Preconditions;
-
-public class SplitsInClientOptionParser {
-
-  private CommandLine cmdLine;
-  private String[] otherArgs;
-
-  private boolean parsed = false;
-
-  public SplitsInClientOptionParser() {
-
-  }
-
-  public String[] getRemainingArgs() {
-    Preconditions.checkState(parsed,
-        "Cannot get remaining args without parsing");
-    return otherArgs;
-  }
-
-  @SuppressWarnings("static-access")
-  public boolean parse(String[] args, boolean defaultVal) throws ParseException {
-    Preconditions.checkState(parsed == false,
-        "Craete a new instance for different option sets");
-    parsed = true;
-    Options opts = new Options();
-    Option opt = OptionBuilder
-        .withArgName("splits_in_client")
-        .hasArg()
-        .withDescription(
-            "specify whether splits should be generated in the client")
-        .create("generateSplitsInClient");
-    opts.addOption(opt);
-    CommandLineParser parser = new GnuParser();
-
-    cmdLine = parser.parse(opts, args, false);
-    if (cmdLine.hasOption("generateSplitsInClient")) {
-      defaultVal = Boolean.parseBoolean(cmdLine
-          .getOptionValue("generateSplitsInClient"));
-    }
-    otherArgs = cmdLine.getArgs();
-    return defaultVal;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/package.html
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/package.html b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/package.html
deleted file mode 100644
index 0906086..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/package.html
+++ /dev/null
@@ -1,23 +0,0 @@
-<html>
-
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-<body>
-Hadoop example code.
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/.gitignore
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/.gitignore b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/.gitignore
deleted file mode 100644
index 8239358..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/.gitignore
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-Yahoo2009.aux
-Yahoo2009.bbl
-Yahoo2009.blg
-Yahoo2009.log
-Yahoo2009.out
-Yahoo2009.pdf

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex
deleted file mode 100644
index 1deb812..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/Yahoo2009.tex
+++ /dev/null
@@ -1,370 +0,0 @@
-% Licensed to the Apache Software Foundation (ASF) under one
-% or more contributor license agreements.  See the NOTICE file
-% distributed with this work for additional information
-% regarding copyright ownership.  The ASF licenses this file
-% to you under the Apache License, Version 2.0 (the
-% "License"); you may not use this file except in compliance
-% with the License.  You may obtain a copy of the License at
-%
-%     http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS,
-% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-% See the License for the specific language governing permissions and
-% limitations under the License.
-\documentclass{article}
-\usepackage[pdftex]{hyperref}
-\usepackage[pdftex]{graphicx}
-
-\title{Winning a 60 Second Dash with a Yellow Elephant}
-\author{\href{http://people.apache.org/~omalley}{Owen O'Malley} and 
-        \href{http://people.apache.org/~acmurthy}{Arun C. Murthy}\\
-\href{http://www.yahoo.com/}{Yahoo!}\\
-owen@yahoo-inc.com and acm@yahoo-inc.com}
-\date{April 2009}
-\begin{document}
-\maketitle
-\href{http://hadoop.apache.org/core}{Apache Hadoop} is a open source
-software framework that dramatically simplifies writing distributed
-data intensive applications. It provides a distributed file system,
-which is modeled after the Google File System\cite{gfs}, and a
-map/reduce\cite{mapreduce} implementation that manages distributed
-computation. Jim Gray defined a benchmark to compare large sorting
-programs. Since the core of map/reduce is a distributed sort, most of
-the custom code is glue to get the desired behavior.
-
-\section{Benchmark Rules}
-
-Jim's Gray's sort benchmark consists of a set of many related
-benchmarks, each with their own rules. All of the sort benchmarks
-measure the time to sort different numbers of 100 byte records. The
-first 10 bytes of each record is the key and the rest is the
-value. The \textbf{minute sort} must finish end to end in less than a
-minute. The \textbf{Gray sort} must sort more than 100 terabytes and
-must run for at least an hour.
-
-\begin{itemize}
-\item The input data must precisely match the data generated by the C
-  data generator.
-\item The input must not be in the operating system's file
-  cache when the job starts.. Under Linux, this requires using the memory for something
-  else between sorting runs.
-\item The input and output data must not be compressed.
-\item The output must not overwrite the input.
-\item The output must be synced to disk.
-\item The 128 bit sum of the crc32's of each key/value pair must be
-  calculated for the input and output. Naturally, they must be
-  identical.
-\item The output may be divided into multiple output files, but it
-  must be totally ordered (simply concatenating the output files must
-  produce the completely sorted output).
-\item Starting and distributing the application to the cluster must be
-  included in the execution time.
-\item Any sampling must be included in the execution time.
-\end{itemize}
-
-\section{Hadoop implementation}
-
-We extended the programs from last year to create and manipulate the
-new binary format and match the new rules. There are now 4 Hadoop
-map/reduce applications to support the benchmark:
-\begin{enumerate}
-\item \textbf{TeraGen} is a map/reduce program to generate the data.
-\item \textbf{TeraSort} samples the input data and uses map/reduce to
-  sort the data into a total order.
-\item \textbf{TeraSum} is a map/reduce program computes the 128 bit
-  sum of the crc32 of each key/value pair.
-\item \textbf{TeraValidate} is a map/reduce program that validates the
-  output is sorted and computes the sum of the checksums as TeraSum.
-\end{enumerate}
-The update to the terasort programs will be checked in as
-\href{http://issues.apache.org/jira/browse/HADOOP-5716}{HADOOP-5716}.
-
-\textbf{TeraGen} generates input data for the sort that is byte for byte
-equivalent to the C version that was released in March of 2009,
-including specific keys and values. It divides the desired number of
-rows by the desired number of tasks and assigns ranges of rows to each
-map. The map jumps the random number generator to the correct value
-for the first row and generates the following rows.
-
-\textbf{TeraSort} is a standard map/reduce sort, except for a custom
-partitioner that ensures that all of the keys in reduce $N$ are after
-all of the keys in reduce $N-1$. This is a requirement of the contest
-so that the output of the sort is totally ordered, even if it is
-divided up by reduce.
-
-We wrote an input and output format, used by all 4 applications to
-read and write the files in the new format.
-
-\textbf{TeraSum} computes the 128 bit sum of the CRC32 of each
-key/value pair. Each map computes the sum of its input and emits a
-single 128 bit sum. There is a single reduce that adds the sums from
-each map. We used this program on the input directory to calculate the
-sum of the checksums of each key/value pair to check the correctness
-of the output of the sort. We also used TeraSum on a distinct dataset
-that was larger than the total RAM in the cluster to flush the Linux
-file cache between runs of the small (500 GB and 1TB) sorts.
-
-\textbf{TeraValidate} ensures that the output is globally sorted. It
-creates one map per file in the output directory and each map
-ensures that each key is less than or equal to the previous one. The
-map also generates records with the first and last keys of the file
-and the reduce ensures that the first key of file $i$ is greater that
-the last key of file $i-1$. Any problems are reported as output of the
-reduce with the keys that are out of order. Additionally, TeraValidate
-calculates the sum of checksums of the output directory.
-
-\section{Hardware and Operating System}
-
-We ran our benchmarks on Yahoo's Hammer cluster. Hammer's hardware is
-very similar to the hardware that we used in last year's terabyte
-sort. The hardware and operating system details are:
-
-\begin{itemize}
-\item approximately 3800 nodes (in such a large cluster, nodes are
-  always down)
-\item 2 quad core Xeons @ 2.5ghz per node
-\item 4 SATA disks per node
-\item 8G RAM per node (upgraded to 16GB before the petabyte sort)
-\item 1 gigabit ethernet on each node
-\item 40 nodes per rack
-\item 8 gigabit ethernet uplinks from each rack to the core
-\item Red Hat Enterprise Linux Server Release 5.1 (kernel 2.6.18)
-\item Sun Java JDK (1.6.0\_05-b13 and 1.6.0\_13-b03) (32 and 64 bit)
-\end{itemize}
-
-We hit a JVM bug in 1.6.0\_05-b13 on the larger sorts (100TB and 1PB)
-and switched over to the later JVM, which resolved the issue. For the
-larger sorts, we used 64 bit JVMs for the Name Node and Job Tracker.
-
-\section{Software and Configuration}
-
-The version of Hadoop we used was a private branch of trunk that was
-started in January 2009, which is after the 0.20 branch was feature
-frozen. We used git to manage our branch and it allowed us to easily
-coordinate our work, track our changes, and resynchronize with the
-current Hadoop trunk.
-
-The changes include:
-
-\begin{enumerate}
-
-\item Updated the terasort example in the Hadoop code base to match
-  the dataset defined by the rule changes in the benchmark from March
-  of 2009.
-  (\href{http://issues.apache.org/jira/browse/HADOOP-5716}{HADOOP-5716})
-
-\item We reimplemented the reducer side of Hadoop's shuffle. The
-  redesign improved the performance of the shuffle and removed
-  bottlenecks and over-throttling. It also made the code more
-  maintainable and understandable by breaking a 3000 line Java file
-  into multiple classes with a clean set of interfaces.
-  (\href{http://issues.apache.org/jira/browse/HADOOP-5223}{HADOOP-5223})
-
-\item The new shuffle also fetches multiple map outputs from the same
-  node over each connection rather than one at a time. Fetching
-  multiple map outputs at the same time avoids connection setup costs
-  and also avoids the round trip while the server responds to the request.
-  (\href{http://issues.apache.org/jira/browse/HADOOP-1338}{HADOOP-1338})
-  
-\item Allowed configuring timeouts on the shuffle connections and we
-  shortened them for the small sorts. We observed cases where the
-  connections for the shuffle would hang until the timeout, which made
-  low latency jobs impossibly long.
-  (\href{http://issues.apache.org/jira/browse/HADOOP-5789}{HADOOP-5789})
-
-\item Set TCP no-delay and more frequent pings between the Task and
-  the Task Tracker to reduce latency in detecting problems.
-  (\href{http://issues.apache.org/jira/browse/HADOOP-5788}{HADOOP-5788})
-
-\item We added some protection code to detect incorrect data being
-  transmitted in the shuffle from causing the reduce to fail. It
-  appears this is either a JVM NIO bug or Jetty bug that likely
-  affects 0.20 and trunk under heavy load.
-  (\href{http://issues.apache.org/jira/browse/HADOOP-5783}{HADOOP-5783})
-
-\item We used LZO compression on the map outputs. On the new dataset, LZO
-  compresses down to 45\% of the original size. By comparison, the
-  dataset from last year compresses to 20\% of the original size. Last
-  year, the shuffle would run out of direct buffers if we used
-  compression on the map outputs.
-
-\item We implemented memory to memory merges in the reduce during the
-  shuffle to combine the map outputs in memory before we finish the
-  shuffle, thereby reducing the work needed when the reduce is
-  running.
-
-\item We multi-threaded the sampling code that read the input set to
-  find the partition points between the reduces. We also wrote a
-  simple partitioner that assumes the keys are evenly
-  distributed. Since the new dataset does not require sampling, the
-  simple partitioner produces very even partitions.
-  (\href{http://issues.apache.org/jira/browse/HADOOP-4946}{HADOOP-4946})
-
-\item On the smaller clusters, we configured the system with faster
-  heartbeat cycles from the Task Trackers to the Job Tracker (it
-  defaults to 10 secs / 1000 nodes, but we made it configurable and
-  brought it down to 2 seconds/1000 nodes to provide lower latency)
-  (\href{http://issues.apache.org/jira/browse/HADOOP-5784}{HADOOP-5784})
-
-\item Typically the Job Tracker assigns tasks to Task Trackers on a
-  first come first served basis. This greedy assignment of tasks did
-  not lead to good data locality. However, by taking a global view and
-  placing all of the map tasks at once, the system achieves much better
-  locality. Rather than implement global scheduling for all of Hadoop,
-  which would be much harder, we implemented a global scheduler for
-  the terasort example in the input format. Basically, the input
-  format computes the splits and assigns work to the nodes that have
-  the fewest blocks first. For a node that has more blocks
-  than map slots, it picks the block that have the fewest remaining
-  locations left. This greedy global algorithm seems to get very good
-  locality. The input format would schedule the maps and then change
-  the input split descriptions to only have a single location instead
-  of the original 3. This increased task locality by 40\% or so over
-  the greedy scheduler.
-
-\item Hadoop 0.20 added setup and cleanup tasks. Since they are not
-  required for the sort benchmarks, we allow them to be disabled to
-  reduce the latency of starting and stopping the job.
-  (\href{http://issues.apache.org/jira/browse/HADOOP-5785}{HADOOP-5785})
-
-\item We discovered a performance problem where in some contexts the
-  cost of using the JNI-based CRC32 was very high. By implementing it
-  in pure Java, the average case is a little slower, but the worst
-  case is much better.
-  (\href{http://issues.apache.org/jira/browse/HADOOP-5598}{HADOOP-5598})
-
-\item We found and removed some hard-coded wait loops from the
-  framework that don't matter for large jobs, but can seriously slow
-  down low latency jobs.
-
-\item Allowed setting the logging level for the tasks, so that we
-  could cut down on logging. When running for "real" we configure the
-  logging level to WARN instead of the default INFO. Reducing the
-  amount of logging has a huge impact on the performance of the
-  system, but obviously makes debugging and analysis much harder.
-  (\href{http://issues.apache.org/jira/browse/HADOOP-5786}{HADOOP-5786})
-
-\item One optimization that we didn't finish is to optimize the job
-  planning code. Currently, it uses an RPC to the Name Node for each
-  input file, which we have observed taking a substantial amount of
-  time. For the terabyte sort, our investigations show that we
-  could save about 4 seconds out of the 8 that were spent on setting
-  up the job.
-  (\href{http://issues.apache.org/jira/browse/HADOOP-5795}{HADOOP-5795})
-
-\end{enumerate}
-
-\section{Results}
-
-Hadoop has made a lot of progress in the last year and we were able to
-run much lower latency jobs as well as much larger jobs. Note that in
-any large cluster and distributed application, there are a lot of
-moving pieces and thus we have seen a wide variation in execution
-times. As Hadoop evolves and becomes more graceful in the presence of
-hardware degradation and failure, this variation should smooth
-out. The best times for each of the listed sort sizes were:
-\\
-
-\begin{tabular}{| c | c | c | c | c | c |}
-\hline
-Bytes & Nodes & Maps & Reduces & Replication & Time \\
-\hline
-$5*10^{11}$ & 1406 & 8000 & 2600 & 1 & 59 seconds \\
-$10^{12}$ & 1460 & 8000 & 2700 & 1 & 62 seconds \\
-$10^{14}$ & 3452 & 190,000 & 10,000 & 2 & 173 minutes \\
-$10^{15}$ & 3658 & 80,000 & 20,000 & 2 & 975 minutes \\
-\hline
-\end{tabular}\\
-
-Within the rules for the 2009 Gray sort, our 500 GB sort set a new
-record for the minute sort and the 1PB sort set a new record of 1.03
-TB/minute. The 62 second terabyte sort would have set a new record,
-but the terabyte benchmark that we won last year has been
-retired. (Clearly the minute sort and terabyte sort are rapidly
-converging, and thus it is not a loss.)  One piece of trivia is that
-only the petabyte dataset had any duplicate keys (40 of them).
-
-Because the smaller sorts needed lower latency and faster network, we
-only used part of the cluster for those runs. In particular, instead
-of our normal 5:1 over subscription between racks, we limited it to 16
-nodes in each rack for a 2:1 over subscription. The smaller runs can
-also use output replication of 1, because they only take minutes to
-run and run on smaller clusters, the likelihood of a node failing is
-fairly low. On the larger runs, failure is expected and thus
-replication of 2 is required. HDFS protects against data loss during
-rack failure by writing the second replica on a different rack and
-thus writing the second replica is relatively slow.
-
-We've included the timelines for the jobs counting from the job
-submission at the Job Tracker. The diagrams show the number of tasks
-running at each point in time. While maps only have a single phase,
-the reduces have three: \textbf{shuffle}, \textbf{merge}, and
-\textbf{reduce}. The shuffle is the transfer of the data from the
-maps. Merge doesn't happen in these benchmarks, because none of the
-reduces need multiple levels of merges. Finally, the reduce phase is
-where the final merge and writing to HDFS happens. I've also included
-a category named \textbf{waste} that represents task attempts that
-were running, but ended up either failing, or being killed (often as
-speculatively executed task attempts). The job logs and configuration
-for the four runs, which are the raw data for the charts, are
-available on
-\href{http://people.apache.org/~omalley/tera-2009/}{http://people.apache.org/~omalley/tera-2009/}.
-
-If you compare this years charts to last year's, you'll notice that
-tasks are launching much faster now. Last year we only launched one
-task per heartbeat, so it took 40 seconds to get all of the tasks
-launched. Now, Hadoop will fill up a Task Tracker in a single
-heartbeat. Reducing that job launch overhead is very important
-for getting runs under a minute.
-
-As with last year, we ran with significantly larger tasks than the
-defaults for Hadoop. Even with the new more aggressive shuffle,
-minimizing the number of transfers (maps * reduces) is very important
-to the performance of the job. Notice that in the petabyte sort, each
-map is processing 15 GB instead of the default 128 MB and each reduce
-is handling 50 GB. When we ran the petabyte with more typical values
-1.5 GB / map, it took 40 hours to finish. Therefore, to increase
-throughput, it makes sense to consider increasing the default block
-size, which translates into the default map size, to at least up to 1
-GB.
-
-\section{Comments on the Rule Changes}
-
-The group that runs the Gray Sort Benchmark made very substantial
-changes to the rules this year. The changes were not announced; but
-rather appeared on the website in March. We feel that it was too late
-to make rule changes and that the benchmark should have been changed
-next year. We'd also like to point out that while most of the changes to
-the data generator were positive, it was a poor choice to remove the
-skewed distribution of the keys. The previously skewed distribution
-required sampling of the input to pick good partition points between
-the reduces. The current dataset picks keys so completely random that
-sampling is counter productive and yields even less distributions between the
-reduces.
-
-\bibliographystyle{abbrv}
-\bibliography{tera}
-
-\begin{figure}[!p]
-\includegraphics[width=4.21in]{500GBTaskTime.png}
-\caption{500 GB sort tasks across time}\label{500GbTimeline}
-\end{figure} 
-
-\begin{figure}[!p]
-\includegraphics[width=4.5in]{1TBTaskTime.png}
-\caption{1 TB sort tasks across time}\label{1TbTimeline}
-\end{figure} 
-
-\begin{figure}[!p]
-\includegraphics[width=4.5in]{100TBTaskTime.png}
-\caption{100 TB sort tasks across time}\label{100TbTimeline}
-\end{figure} 
-
-\begin{figure}[!p]
-\includegraphics[width=4.5in]{1PBTaskTime.png}
-\caption{1 PB sort tasks across time}\label{1PbTimeline}
-\end{figure} 
-
-\end{document}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib
deleted file mode 100644
index 50a797f..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/2009-write-up/tera.bib
+++ /dev/null
@@ -1,31 +0,0 @@
-% Licensed to the Apache Software Foundation (ASF) under one
-% or more contributor license agreements.  See the NOTICE file
-% distributed with this work for additional information
-% regarding copyright ownership.  The ASF licenses this file
-% to you under the Apache License, Version 2.0 (the
-% "License"); you may not use this file except in compliance
-% with the License.  You may obtain a copy of the License at
-%
-%     http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS,
-% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-% See the License for the specific language governing permissions and
-% limitations under the License.
-@INPROCEEDINGS{mapreduce,
-	AUTHOR = "Jeffery Dean and Sanjay Ghemawat",
-	TITLE = "MapReduce: Simplified Data Processing on Large Clusters",
-	BOOKTITLE = "Sixth Symposium on Operating System Design and Implementation",
-	MONTH = "December", 
-        ADDRESS = "San Francisco, CA",
-	YEAR = {2004}	}
-
-@INPROCEEDINGS{gfs,
-	AUTHOR = "Sanjay Ghemawat and Howard Gobioff and Shun-Tak Leung",
-	TITLE = "The Google File System",
-	BOOKTITLE = "19th Symposium on Operating Systems Principles",
-        ORGANIZATION = "ACM",
-	MONTH = "October", 
-        ADDRESS = "Lake George, NY",
-	YEAR = {2003}	}