You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/09/20 01:39:10 UTC

svn commit: r697283 - in /hadoop/core/trunk: ./ bin/ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/lib/ src/test/org/apache/hadoop/mapred/lib/ src/tools/org/apache/hadoop/tools/

Author: omalley
Date: Fri Sep 19 16:39:09 2008
New Revision: 697283

URL: http://svn.apache.org/viewvc?rev=697283&view=rev
Log:
HADOOP-3019. A new library to support total order partitions.
(cdouglas via omalley)

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java
    hadoop/core/trunk/src/tools/org/apache/hadoop/tools/InputSampler.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/bin/hadoop
    hadoop/core/trunk/build.xml
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=697283&r1=697282&r2=697283&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 16:39:09 2008
@@ -199,6 +199,9 @@
     allow Map-Reduce applications to work with databases. (Fredrik Hedberg and
     Enis Soztutar via acmurthy)
 
+    HADOOP-3019. A new library to support total order partitions.
+    (cdouglas via omalley)
+
   IMPROVEMENTS
 
     HADOOP-4106. libhdfs: add time, permission and user attribute support (part 2).

Modified: hadoop/core/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/bin/hadoop?rev=697283&r1=697282&r2=697283&view=diff
==============================================================================
--- hadoop/core/trunk/bin/hadoop (original)
+++ hadoop/core/trunk/bin/hadoop Fri Sep 19 16:39:09 2008
@@ -122,6 +122,9 @@
 if [ -d "$HADOOP_HOME/build/test/classes" ]; then
   CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build/test/classes
 fi
+if [ -d "$HADOOP_HOME/build/tools" ]; then
+  CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build/tools
+fi
 
 # so that filenames w/ spaces are handled correctly in loops below
 IFS=
@@ -220,6 +223,10 @@
   CLASS=org.apache.hadoop.tools.HadoopArchives
   CLASSPATH=${CLASSPATH}:${TOOL_PATH}
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
+elif [ "$COMMAND" = "sampler" ] ; then
+  CLASS=org.apache.hadoop.tools.InputSampler
+  CLASSPATH=${CLASSPATH}:${TOOL_PATH}
+  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
 else
   CLASS=$COMMAND
 fi

Modified: hadoop/core/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/build.xml?rev=697283&r1=697282&r2=697283&view=diff
==============================================================================
--- hadoop/core/trunk/build.xml (original)
+++ hadoop/core/trunk/build.xml Fri Sep 19 16:39:09 2008
@@ -422,7 +422,7 @@
   </target>
 
   <target name="compile-examples" 
-          depends="compile-core,compile-c++-examples">
+          depends="compile-core,compile-tools,compile-c++-examples">
     <javac 
      encoding="${build.encoding}" 
      srcdir="${examples.dir}"
@@ -434,7 +434,10 @@
      source="${javac.version}"
      deprecation="${javac.deprecation}">
       <compilerarg line="${javac.args} ${javac.args.warnings}" />
-      <classpath refid="classpath"/>
+      <classpath>
+        <path refid="classpath"/>
+        <pathelement location="${build.tools}"/>
+      </classpath>
     </javac>    
   </target>
 
@@ -852,6 +855,7 @@
             <include name="*/lib/*.jar" />
           </fileset>
           <pathelement path="${java.class.path}"/>
+          <pathelement location="${build.tools}"/>
         </classpath>
 
     	<group title="Core" packages="org.apache.*"/>

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java?rev=697283&r1=697282&r2=697283&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java Fri Sep 19 16:39:09 2008
@@ -19,10 +19,12 @@
 package org.apache.hadoop.examples;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
@@ -30,6 +32,8 @@
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+import org.apache.hadoop.tools.InputSampler;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -43,9 +47,10 @@
  *            [-outFormat <i>output format class</i>] 
  *            [-outKey <i>output key class</i>] 
  *            [-outValue <i>output value class</i>] 
+ *            [-totalOrder <i>pcnt</i> <i>num samples</i> <i>max splits</i>]
  *            <i>in-dir</i> <i>out-dir</i> 
  */
-public class Sort extends Configured implements Tool {
+public class Sort<K,V> extends Configured implements Tool {
 
   static int printUsage() {
     System.out.println("sort [-m <maps>] [-r <reduces>] " +
@@ -53,6 +58,7 @@
                        "[-outFormat <output format class>] " + 
                        "[-outKey <output key class>] " +
                        "[-outValue <output value class>] " +
+                       "[-totalOrder <pcnt> <num samples> <max splits>] " +
                        "<input> <output>");
     ToolRunner.printGenericCommandUsage(System.out);
     return -1;
@@ -87,6 +93,7 @@
     Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
     Class<? extends Writable> outputValueClass = BytesWritable.class;
     List<String> otherArgs = new ArrayList<String>();
+    InputSampler.Sampler<K,V> sampler = null;
     for(int i=0; i < args.length; ++i) {
       try {
         if ("-m".equals(args[i])) {
@@ -105,6 +112,13 @@
         } else if ("-outValue".equals(args[i])) {
           outputValueClass = 
             Class.forName(args[++i]).asSubclass(Writable.class);
+        } else if ("-totalOrder".equals(args[i])) {
+          double pcnt = Double.parseDouble(args[++i]);
+          int numSamples = Integer.parseInt(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler =
+            new InputSampler.RandomSampler<K,V>(pcnt, numSamples, maxSplits);
         } else {
           otherArgs.add(args[i]);
         }
@@ -136,6 +150,20 @@
     FileInputFormat.setInputPaths(jobConf, otherArgs.get(0));
     FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.get(1)));
 
+    if (sampler != null) {
+      System.out.println("Sampling input to effect total-order sort...");
+      jobConf.setPartitionerClass(TotalOrderPartitioner.class);
+      Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
+      inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
+      Path partitionFile = new Path(inputDir, "_sortPartitioning");
+      TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
+      InputSampler.<K,V>writePartitionFile(jobConf, sampler);
+      URI partitionUri = new URI(partitionFile.toString() +
+                                 "#" + "_sortPartitioning");
+      DistributedCache.addCacheFile(partitionUri, jobConf);
+      DistributedCache.createSymlink(jobConf);
+    }
+
     System.out.println("Running on " +
         cluster.getTaskTrackers() +
         " nodes to sort from " + 

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java?rev=697283&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java Fri Sep 19 16:39:09 2008
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Partitioner effecting a total order by reading split points from
+ * an externally generated source.
+ */
+public class TotalOrderPartitioner<K extends WritableComparable,V>
+    implements Partitioner<K,V> {
+
+  private Node partitions;
+  public static final String DEFAULT_PATH = "_partition.lst";
+
+  public TotalOrderPartitioner() { }
+
+  /**
+   * Read in the partition file and build indexing data structures.
+   * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
+   * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
+   * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
+   * will be built. Otherwise, keys will be located using a binary search of
+   * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
+   * defined for this job. The input file must be sorted with the same
+   * comparator and contain {@link
+     org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
+   */
+  @SuppressWarnings("unchecked") // keytype from conf not static
+  public void configure(JobConf job) {
+    try {
+      String parts = getPartitionFile(job);
+      final Path partFile = new Path(parts);
+      final FileSystem fs = (DEFAULT_PATH.equals(parts))
+        ? FileSystem.getLocal(job)     // assume in DistributedCache
+        : partFile.getFileSystem(job);
+
+      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+      K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
+      if (splitPoints.length != job.getNumReduceTasks() - 1) {
+        throw new IOException("Wrong number of partitions in keyset");
+      }
+      RawComparator<K> comparator =
+        (RawComparator<K>) job.getOutputKeyComparator();
+      for (int i = 0; i < splitPoints.length - 1; ++i) {
+        if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
+          throw new IOException("Split points are out of order");
+        }
+      }
+      boolean natOrder =
+        job.getBoolean("total.order.partitioner.natural.order", true);
+      if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
+        partitions = buildTrie((BinaryComparable[])splitPoints, 0,
+            splitPoints.length, new byte[0],
+            job.getInt("total.order.partitioner.max.trie.depth", 2));
+      } else {
+        partitions = new BinarySearchNode(splitPoints, comparator);
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Can't read partitions file", e);
+    }
+  }
+
+                                 // by construction, we know if our keytype
+  @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
+  public int getPartition(K key, V value, int numPartitions) {
+    return partitions.findPartition(key);
+  }
+
+  /**
+   * Set the path to the SequenceFile storing the sorted partition keyset.
+   * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
+   * keys in the SequenceFile.
+   */
+  public static void setPartitionFile(JobConf job, Path p) {
+    job.set("total.order.partitioner.path", p.toString());
+  }
+
+  /**
+   * Get the path to the SequenceFile storing the sorted partition keyset.
+   * @see #setPartitionFile(JobConf,Path)
+   */
+  public static String getPartitionFile(JobConf job) {
+    return job.get("total.order.partitioner.path", DEFAULT_PATH);
+  }
+
+  /**
+   * Interface to the partitioner to locate a key in the partition keyset.
+   */
+  interface Node<T> {
+    /**
+     * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
+     * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
+     */
+    int findPartition(T key);
+  }
+
+  /**
+   * Base class for trie nodes. If the keytype is memcomp-able, this builds
+   * tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
+   * bytes.
+   */
+  static abstract class TrieNode implements Node<BinaryComparable> {
+    private final int level;
+    TrieNode(int level) {
+      this.level = level;
+    }
+    int getLevel() {
+      return level;
+    }
+  }
+
+  /**
+   * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
+   * where disabled by <tt>total.order.partitioner.natural.order</tt>,
+   * search the partition keyset with a binary search.
+   */
+  class BinarySearchNode implements Node<K> {
+    private final K[] splitPoints;
+    private final RawComparator<K> comparator;
+    BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
+      this.splitPoints = splitPoints;
+      this.comparator = comparator;
+    }
+    public int findPartition(K key) {
+      final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
+      return (pos < 0) ? -pos : pos;
+    }
+  }
+
+  /**
+   * An inner trie node that contains 256 children based on the next
+   * character.
+   */
+  class InnerTrieNode extends TrieNode {
+    private TrieNode[] child = new TrieNode[256];
+
+    InnerTrieNode(int level) {
+      super(level);
+    }
+    public int findPartition(BinaryComparable key) {
+      int level = getLevel();
+      if (key.getLength() <= level) {
+        return child[0].findPartition(key);
+      }
+      return child[0xFF & key.getBytes()[level]].findPartition(key);
+    }
+  }
+
+  /**
+   * A leaf trie node that scans for the key between lower..upper.
+   */
+  class LeafTrieNode extends TrieNode {
+    final int lower;
+    final int upper;
+    final BinaryComparable[] splitPoints;
+    LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
+      super(level);
+      this.lower = lower;
+      this.upper = upper;
+      this.splitPoints = splitPoints;
+    }
+    public int findPartition(BinaryComparable key) {
+      final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
+      return (pos < 0) ? -pos : pos;
+    }
+  }
+
+
+  /**
+   * Read the cut points from the given IFile.
+   * @param fs The file system
+   * @param p The path to read
+   * @param keyClass The map output key class
+   * @param job The job config
+   * @throws IOException
+   */
+                                 // matching key types enforced by passing in
+  @SuppressWarnings("unchecked") // map output key class
+  private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
+      JobConf job) throws IOException {
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
+    ArrayList<K> parts = new ArrayList<K>();
+    K key = (K) ReflectionUtils.newInstance(keyClass, job);
+    NullWritable value = NullWritable.get();
+    while (reader.next(key, value)) {
+      parts.add(key);
+      key = (K) ReflectionUtils.newInstance(keyClass, job);
+    }
+    reader.close();
+    return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
+  }
+
+  /**
+   * Given a sorted set of cut points, build a trie that will find the correct
+   * partition quickly.
+   * @param splits the list of cut points
+   * @param lower the lower bound of partitions 0..numPartitions-1
+   * @param upper the upper bound of partitions 0..numPartitions-1
+   * @param prefix the prefix that we have already checked against
+   * @param maxDepth the maximum depth we will build a trie for
+   * @return the trie node that will divide the splits correctly
+   */
+  private TrieNode buildTrie(BinaryComparable[] splits, int lower,
+      int upper, byte[] prefix, int maxDepth) {
+    final int depth = prefix.length;
+    if (depth >= maxDepth || lower == upper) {
+      return new LeafTrieNode(depth, splits, lower, upper);
+    }
+    InnerTrieNode result = new InnerTrieNode(depth);
+    byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
+    // append an extra byte on to the prefix
+    int currentBound = lower;
+    for(int ch = 0; ch < 255; ++ch) {
+      trial[depth] = (byte) (ch + 1);
+      lower = currentBound;
+      while (currentBound < upper) {
+        if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
+          break;
+        }
+        currentBound += 1;
+      }
+      trial[depth] = (byte) ch;
+      result.child[0xFF & ch] = buildTrie(splits, lower, currentBound, trial,
+                                   maxDepth);
+    }
+    // pick up the rest
+    trial[depth] = 127;
+    result.child[255] = buildTrie(splits, currentBound, upper, trial,
+                                  maxDepth);
+    return result;
+  }
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java?rev=697283&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java Fri Sep 19 16:39:09 2008
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+import junit.extensions.TestSetup;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+
+public class TestTotalOrderPartitioner extends TestCase {
+
+  private static final Text[] splitStrings = new Text[] {
+    // -inf            // 0
+    new Text("aabbb"), // 1
+    new Text("babbb"), // 2
+    new Text("daddd"), // 3
+    new Text("dddee"), // 4
+    new Text("ddhee"), // 5
+    new Text("dingo"), // 6
+    new Text("hijjj"), // 7
+    new Text("n"),     // 8
+    new Text("yak"),   // 9
+  };
+
+  static class Check<T> {
+    T data;
+    int part;
+    Check(T data, int part) {
+      this.data = data;
+      this.part = part;
+    }
+  }
+
+  private static final ArrayList<Check<Text>> testStrings =
+    new ArrayList<Check<Text>>();
+  static {
+    testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
+    testStrings.add(new Check<Text>(new Text("aaabb"), 0));
+    testStrings.add(new Check<Text>(new Text("aabbb"), 1));
+    testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
+    testStrings.add(new Check<Text>(new Text("babbb"), 2));
+    testStrings.add(new Check<Text>(new Text("baabb"), 1));
+    testStrings.add(new Check<Text>(new Text("yai"), 8));
+    testStrings.add(new Check<Text>(new Text("yak"), 9));
+    testStrings.add(new Check<Text>(new Text("z"), 9));
+    testStrings.add(new Check<Text>(new Text("ddngo"), 5));
+    testStrings.add(new Check<Text>(new Text("hi"), 6));
+  };
+
+  private static <T extends WritableComparable> Path writePartitionFile(
+      String testname, JobConf conf, T[] splits) throws IOException {
+    final FileSystem fs = FileSystem.getLocal(conf);
+    final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
+                                 ).makeQualified(fs);
+    Path p = new Path(testdir, testname + "/_partition.lst");
+    TotalOrderPartitioner.setPartitionFile(conf, p);
+    conf.setNumReduceTasks(splits.length + 1);
+    SequenceFile.Writer w = null;
+    try {
+      NullWritable nw = NullWritable.get();
+      w = SequenceFile.createWriter(fs, conf, p,
+          splits[0].getClass(), NullWritable.class,
+          SequenceFile.CompressionType.NONE);
+      for (int i = 0; i < splits.length; ++i) {
+        w.append(splits[i], NullWritable.get());
+      }
+    } finally {
+      if (null != w)
+        w.close();
+    }
+    return p;
+  }
+
+  public void testTotalOrderMemCmp() throws Exception {
+    TotalOrderPartitioner<Text,NullWritable> partitioner =
+      new TotalOrderPartitioner<Text,NullWritable>();
+    JobConf job = new JobConf();
+    Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+        "totalordermemcmp", job, splitStrings);
+    job.setMapOutputKeyClass(Text.class);
+    try {
+      partitioner.configure(job);
+      NullWritable nw = NullWritable.get();
+      for (Check<Text> chk : testStrings) {
+        assertEquals(chk.data.toString(), chk.part,
+            partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+      }
+    } finally {
+      p.getFileSystem(job).delete(p);
+    }
+  }
+
+  public void testTotalOrderBinarySearch() throws Exception {
+    TotalOrderPartitioner<Text,NullWritable> partitioner =
+      new TotalOrderPartitioner<Text,NullWritable>();
+    JobConf job = new JobConf();
+    Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+        "totalorderbinarysearch", job, splitStrings);
+    job.setBoolean("total.order.partitioner.natural.order", false);
+    job.setMapOutputKeyClass(Text.class);
+    try {
+      partitioner.configure(job);
+      NullWritable nw = NullWritable.get();
+      for (Check<Text> chk : testStrings) {
+        assertEquals(chk.data.toString(), chk.part,
+            partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+      }
+    } finally {
+      p.getFileSystem(job).delete(p);
+    }
+  }
+
+  public static class ReverseStringComparator implements RawComparator<Text> {
+    public int compare(Text a, Text b) {
+      return -a.compareTo(b);
+    }
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+      int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+      return -1 * WritableComparator.compareBytes(b1, s1+n1, l1-n1,
+                                                  b2, s2+n2, l2-n2);
+    }
+  }
+
+  public void testTotalOrderCustomComparator() throws Exception {
+    TotalOrderPartitioner<Text,NullWritable> partitioner =
+      new TotalOrderPartitioner<Text,NullWritable>();
+    JobConf job = new JobConf();
+    Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
+    Arrays.sort(revSplitStrings, new ReverseStringComparator());
+    Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+        "totalordercustomcomparator", job, revSplitStrings);
+    job.setBoolean("total.order.partitioner.natural.order", false);
+    job.setMapOutputKeyClass(Text.class);
+    job.setOutputKeyComparatorClass(ReverseStringComparator.class);
+    ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>();
+    revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
+    revCheck.add(new Check<Text>(new Text("aaabb"), 9));
+    revCheck.add(new Check<Text>(new Text("aabbb"), 9));
+    revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
+    revCheck.add(new Check<Text>(new Text("babbb"), 8));
+    revCheck.add(new Check<Text>(new Text("baabb"), 8));
+    revCheck.add(new Check<Text>(new Text("yai"), 1));
+    revCheck.add(new Check<Text>(new Text("yak"), 1));
+    revCheck.add(new Check<Text>(new Text("z"), 0));
+    revCheck.add(new Check<Text>(new Text("ddngo"), 4));
+    revCheck.add(new Check<Text>(new Text("hi"), 3));
+    try {
+      partitioner.configure(job);
+      NullWritable nw = NullWritable.get();
+      for (Check<Text> chk : revCheck) {
+        assertEquals(chk.data.toString(), chk.part,
+            partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+      }
+    } finally {
+      p.getFileSystem(job).delete(p);
+    }
+  }
+
+}

Added: hadoop/core/trunk/src/tools/org/apache/hadoop/tools/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/tools/org/apache/hadoop/tools/InputSampler.java?rev=697283&view=auto
==============================================================================
--- hadoop/core/trunk/src/tools/org/apache/hadoop/tools/InputSampler.java (added)
+++ hadoop/core/trunk/src/tools/org/apache/hadoop/tools/InputSampler.java Fri Sep 19 16:39:09 2008
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Utility for collecting samples and writing a partition file for
+ * {@link org.apache.hadoop.mapred.lib.TotalOrderPartitioner}.
+ */
+public class InputSampler<K,V> implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(InputSampler.class);
+
+  static int printUsage() {
+    System.out.println("sampler -r <reduces>\n" +
+                       "      [-inFormat <input format class>]\n" +
+                       "      [-keyClass <map input & output key class>]\n" +
+                       "      [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
+                       "// Sample from random splits at random (general)\n" +
+                       "       -splitSample <numSamples> <maxsplits> | " +
+                       "             // Sample from first records in splits (random data)\n"+
+                       "       -splitInterval <double pcnt> <maxsplits>]" +
+                       "             // Sample from splits at intervals (sorted data)");
+    System.out.println("Default sampler: -splitRandom 0.1 10000 10");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return -1;
+  }
+
+  private JobConf conf;
+
+  public InputSampler(JobConf conf) {
+    this.conf = conf;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    if (!(conf instanceof JobConf)) {
+      this.conf = new JobConf(conf);
+    } else {
+      this.conf = (JobConf) conf;
+    }
+  }
+
+  /**
+   * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
+   */
+  public interface Sampler<K,V> {
+    /**
+     * For a given job, collect and return a subset of the keys from the
+     * input data.
+     */
+    K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
+  }
+
+  /**
+   * Samples the first n records from s splits.
+   * Inexpensive way to sample random data.
+   */
+  public static class SplitSampler<K,V> implements Sampler<K,V> {
+
+    private final int numSamples;
+    private final int maxSplitsSampled;
+
+    /**
+     * Create a SplitSampler sampling <em>all</em> splits.
+     * Takes the first numSamples / numSplits records from each split.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public SplitSampler(int numSamples) {
+      this(numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new SplitSampler.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public SplitSampler(int numSamples, int maxSplitsSampled) {
+      this.numSamples = numSamples;
+      this.maxSplitsSampled = maxSplitsSampled;
+    }
+
+    /**
+     * From each split sampled, take the first numSamples / numSplits records.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+      int splitStep = splits.length / splitsToSample;
+      int samplesPerSplit = numSamples / splitsToSample;
+      long records = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
+            job, Reporter.NULL);
+        K key = reader.createKey();
+        V value = reader.createValue();
+        while (reader.next(key, value)) {
+          samples.add(key);
+          key = reader.createKey();
+          ++records;
+          if ((i+1) * samplesPerSplit <= records) {
+            break;
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from random points in the input.
+   * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
+   * each split.
+   */
+  public static class RandomSampler<K,V> implements Sampler<K,V> {
+    private double freq;
+    private final int numSamples;
+    private final int maxSplitsSampled;
+
+    /**
+     * Create a new RandomSampler sampling <em>all</em> splits.
+     * This will read every split at the client, which is very expensive.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public RandomSampler(double freq, int numSamples) {
+      this(freq, numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new RandomSampler.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
+      this.freq = freq;
+      this.numSamples = numSamples;
+      this.maxSplitsSampled = maxSplitsSampled;
+    }
+
+    /**
+     * Randomize the split order, then take the specified number of keys from
+     * each split sampled, where each key is selected with the specified
+     * probability and possibly replaced by a subsequently selected key when
+     * the quota of keys from that split is satisfied.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+
+      Random r = new Random();
+      long seed = r.nextLong();
+      r.setSeed(seed);
+      LOG.debug("seed: " + seed);
+      // shuffle splits
+      for (int i = 0; i < splits.length; ++i) {
+        InputSplit tmp = splits[i];
+        int j = r.nextInt(splits.length);
+        splits[i] = splits[j];
+        splits[j] = tmp;
+      }
+      // our target rate is in terms of the maximum number of sample splits,
+      // but we accept the possibility of sampling additional splits to hit
+      // the target sample keyset
+      for (int i = 0; i < splitsToSample ||
+                     (i < splits.length && samples.size() < numSamples); ++i) {
+        RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
+            Reporter.NULL);
+        K key = reader.createKey();
+        V value = reader.createValue();
+        while (reader.next(key, value)) {
+          if (r.nextDouble() <= freq) {
+            if (samples.size() < numSamples) {
+              samples.add(key);
+            } else {
+              // When exceeding the maximum number of samples, replace a
+              // random element with this one, then adjust the frequency
+              // to reflect the possibility of existing elements being
+              // pushed out
+              int ind = r.nextInt(numSamples);
+              if (ind != numSamples) {
+                samples.set(r.nextInt(numSamples - 1), key);
+              }
+              freq *= (numSamples - 1) / (double) numSamples;
+            }
+            key = reader.createKey();
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from s splits at regular intervals.
+   * Useful for sorted data.
+   */
+  public static class IntervalSampler<K,V> implements Sampler<K,V> {
+    private final double freq;
+    private final int maxSplitsSampled;
+
+    /**
+     * Create a new IntervalSampler sampling <em>all</em> splits.
+     * @param freq The frequency with which records will be emitted.
+     */
+    public IntervalSampler(double freq) {
+      this(freq, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new IntervalSampler.
+     * @param freq The frequency with which records will be emitted.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     * @see #getSample
+     */
+    public IntervalSampler(double freq, int maxSplitsSampled) {
+      this.freq = freq;
+      this.maxSplitsSampled = maxSplitsSampled;
+    }
+
+    /**
+     * For each split sampled, emit when the ratio of the number of records
+     * retained to the total record count is less than the specified
+     * frequency.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+      ArrayList<K> samples = new ArrayList<K>();
+      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+      int splitStep = splits.length / splitsToSample;
+      long records = 0;
+      long kept = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
+            job, Reporter.NULL);
+        K key = reader.createKey();
+        V value = reader.createValue();
+        while (reader.next(key, value)) {
+          ++records;
+          if ((double) kept / records < freq) {
+            ++kept;
+            samples.add(key);
+            key = reader.createKey();
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Write a partition file for the given job, using the Sampler provided.
+   * Queries the sampler for a sample keyset, sorts by the output key
+   * comparator, selects the keys for each rank, and writes to the destination
+   * returned from {@link
+     org.apache.hadoop.mapred.lib.TotalOrderPartitioner#getPartitionFile}.
+   */
+  @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
+  public static <K,V> void writePartitionFile(JobConf job,
+      Sampler<K,V> sampler) throws IOException {
+    final InputFormat<K,V> inf = (InputFormat<K,V>) job.getInputFormat();
+    int numPartitions = job.getNumReduceTasks();
+    K[] samples = sampler.getSample(inf, job);
+    LOG.info("Using " + samples.length + " samples");
+    RawComparator<K> comparator =
+      (RawComparator<K>) job.getOutputKeyComparator();
+    Arrays.sort(samples, comparator);
+    Path dst = new Path(TotalOrderPartitioner.getPartitionFile(job));
+    FileSystem fs = dst.getFileSystem(job);
+    if (fs.exists(dst)) {
+      fs.delete(dst, false);
+    }
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, dst,
+        job.getMapOutputKeyClass(), NullWritable.class);
+    NullWritable nullValue = NullWritable.get();
+    float stepSize = samples.length / (float) numPartitions;
+    int last = -1;
+    for(int i = 1; i < numPartitions; ++i) {
+      int k = Math.round(stepSize * i);
+      while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
+        ++k;
+      }
+      writer.append(samples[k], nullValue);
+      last = k;
+    }
+    writer.close();
+  }
+
+  /**
+   * Driver for InputSampler from the command line.
+   * Configures a JobConf instance and calls {@link #writePartitionFile}.
+   */
+  public int run(String[] args) throws Exception {
+    JobConf job = (JobConf) getConf();
+    ArrayList<String> otherArgs = new ArrayList<String>();
+    Sampler<K,V> sampler = null;
+    for(int i=0; i < args.length; ++i) {
+      try {
+        if ("-r".equals(args[i])) {
+          job.setNumReduceTasks(Integer.parseInt(args[++i]));
+        } else if ("-inFormat".equals(args[i])) {
+          job.setInputFormat(
+              Class.forName(args[++i]).asSubclass(InputFormat.class));
+        } else if ("-keyClass".equals(args[i])) {
+          job.setMapOutputKeyClass(
+              Class.forName(args[++i]).asSubclass(WritableComparable.class));
+        } else if ("-splitSample".equals(args[i])) {
+          int numSamples = Integer.parseInt(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler = new SplitSampler<K,V>(numSamples, maxSplits);
+        } else if ("-splitRandom".equals(args[i])) {
+          double pcnt = Double.parseDouble(args[++i]);
+          int numSamples = Integer.parseInt(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler = new RandomSampler<K,V>(pcnt, numSamples, maxSplits);
+        } else if ("-splitInterval".equals(args[i])) {
+          double pcnt = Double.parseDouble(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler = new IntervalSampler<K,V>(pcnt, maxSplits);
+        } else {
+          otherArgs.add(args[i]);
+        }
+      } catch (NumberFormatException except) {
+        System.out.println("ERROR: Integer expected instead of " + args[i]);
+        return printUsage();
+      } catch (ArrayIndexOutOfBoundsException except) {
+        System.out.println("ERROR: Required parameter missing from " +
+            args[i-1]);
+        return printUsage();
+      }
+    }
+    if (job.getNumReduceTasks() <= 1) {
+      System.err.println("Sampler requires more than one reducer");
+      return printUsage();
+    }
+    if (otherArgs.size() < 2) {
+      System.out.println("ERROR: Wrong number of parameters: ");
+      return printUsage();
+    }
+    if (null == sampler) {
+      sampler = new RandomSampler<K,V>(0.1, 10000, 10);
+    }
+
+    Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
+    TotalOrderPartitioner.setPartitionFile(job, outf);
+    ArrayList<Path> plist = new ArrayList<Path>(otherArgs.size());
+    for (String s : otherArgs) {
+      FileInputFormat.addInputPath(job, new Path(s));
+    }
+    InputSampler.<K,V>writePartitionFile(job, sampler);
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    JobConf job = new JobConf(InputSampler.class);
+    InputSampler sampler = new InputSampler(job);
+    int res = ToolRunner.run(sampler, args);
+    System.exit(res);
+  }
+}