You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/05/04 06:18:23 UTC

svn commit: r771171 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/lib/ src/mapred/org/apache/hadoop/mapreduce/lib/partition/ src/test/org/apache/hadoop/mapred/lib/ src/test/org/apache/hadoop/mapreduce/lib/partition/

Author: cdouglas
Date: Mon May  4 04:18:22 2009
New Revision: 771171

URL: http://svn.apache.org/viewvc?rev=771171&view=rev
Log:
HADOOP-5668. Change TotalOrderPartitioner to use new API. Contributed by Amareshwari Sriramadasu

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
      - copied, changed from r771170, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java
      - copied, changed from r771170, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java
      - copied, changed from r771170, hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java
Removed:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=771171&r1=771170&r2=771171&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May  4 04:18:22 2009
@@ -44,6 +44,9 @@
     and TASK_CLEANUP. Removes the isMap methods from TaskID/TaskAttemptID
     classes. (ddas)
 
+    HADOOP-5668. Change TotalOrderPartitioner to use new API. (Amareshwari
+    Sriramadasu via cdouglas)
+
   NEW FEATURES
 
     HADOOP-4268. Change fsck to use ClientProtocol methods so that the

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java?rev=771171&r1=771170&r2=771171&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java Mon May  4 04:18:22 2009
@@ -19,400 +19,24 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Random;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.mapreduce.Job;
 
 /**
- * Utility for collecting samples and writing a partition file for
- * {@link org.apache.hadoop.mapred.lib.TotalOrderPartitioner}.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.partition.InputSampler}
  */
-public class InputSampler<K,V> implements Tool {
-
-  private static final Log LOG = LogFactory.getLog(InputSampler.class);
-
-  static int printUsage() {
-    System.out.println("sampler -r <reduces>\n" +
-                       "      [-inFormat <input format class>]\n" +
-                       "      [-keyClass <map input & output key class>]\n" +
-                       "      [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
-                       "// Sample from random splits at random (general)\n" +
-                       "       -splitSample <numSamples> <maxsplits> | " +
-                       "             // Sample from first records in splits (random data)\n"+
-                       "       -splitInterval <double pcnt> <maxsplits>]" +
-                       "             // Sample from splits at intervals (sorted data)");
-    System.out.println("Default sampler: -splitRandom 0.1 10000 10");
-    ToolRunner.printGenericCommandUsage(System.out);
-    return -1;
-  }
-
-  private JobConf conf;
+@Deprecated
+public class InputSampler<K,V> extends 
+  org.apache.hadoop.mapreduce.lib.partition.InputSampler<K, V> {
 
   public InputSampler(JobConf conf) {
-    this.conf = conf;
-  }
-
-  public Configuration getConf() {
-    return conf;
-  }
-
-  public void setConf(Configuration conf) {
-    if (!(conf instanceof JobConf)) {
-      this.conf = new JobConf(conf);
-    } else {
-      this.conf = (JobConf) conf;
-    }
-  }
-
-  /**
-   * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
-   */
-  public interface Sampler<K,V> {
-    /**
-     * For a given job, collect and return a subset of the keys from the
-     * input data.
-     */
-    K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
-  }
-
-  /**
-   * Samples the first n records from s splits.
-   * Inexpensive way to sample random data.
-   */
-  public static class SplitSampler<K,V> implements Sampler<K,V> {
-
-    private final int numSamples;
-    private final int maxSplitsSampled;
-
-    /**
-     * Create a SplitSampler sampling <em>all</em> splits.
-     * Takes the first numSamples / numSplits records from each split.
-     * @param numSamples Total number of samples to obtain from all selected
-     *                   splits.
-     */
-    public SplitSampler(int numSamples) {
-      this(numSamples, Integer.MAX_VALUE);
-    }
-
-    /**
-     * Create a new SplitSampler.
-     * @param numSamples Total number of samples to obtain from all selected
-     *                   splits.
-     * @param maxSplitsSampled The maximum number of splits to examine.
-     */
-    public SplitSampler(int numSamples, int maxSplitsSampled) {
-      this.numSamples = numSamples;
-      this.maxSplitsSampled = maxSplitsSampled;
-    }
-
-    /**
-     * From each split sampled, take the first numSamples / numSplits records.
-     */
-    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
-    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
-      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
-      ArrayList<K> samples = new ArrayList<K>(numSamples);
-      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
-      int splitStep = splits.length / splitsToSample;
-      int samplesPerSplit = numSamples / splitsToSample;
-      long records = 0;
-      for (int i = 0; i < splitsToSample; ++i) {
-        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
-            job, Reporter.NULL);
-        K key = reader.createKey();
-        V value = reader.createValue();
-        while (reader.next(key, value)) {
-          samples.add(key);
-          key = reader.createKey();
-          ++records;
-          if ((i+1) * samplesPerSplit <= records) {
-            break;
-          }
-        }
-        reader.close();
-      }
-      return (K[])samples.toArray();
-    }
-  }
-
-  /**
-   * Sample from random points in the input.
-   * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
-   * each split.
-   */
-  public static class RandomSampler<K,V> implements Sampler<K,V> {
-    private double freq;
-    private final int numSamples;
-    private final int maxSplitsSampled;
-
-    /**
-     * Create a new RandomSampler sampling <em>all</em> splits.
-     * This will read every split at the client, which is very expensive.
-     * @param freq Probability with which a key will be chosen.
-     * @param numSamples Total number of samples to obtain from all selected
-     *                   splits.
-     */
-    public RandomSampler(double freq, int numSamples) {
-      this(freq, numSamples, Integer.MAX_VALUE);
-    }
-
-    /**
-     * Create a new RandomSampler.
-     * @param freq Probability with which a key will be chosen.
-     * @param numSamples Total number of samples to obtain from all selected
-     *                   splits.
-     * @param maxSplitsSampled The maximum number of splits to examine.
-     */
-    public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
-      this.freq = freq;
-      this.numSamples = numSamples;
-      this.maxSplitsSampled = maxSplitsSampled;
-    }
-
-    /**
-     * Randomize the split order, then take the specified number of keys from
-     * each split sampled, where each key is selected with the specified
-     * probability and possibly replaced by a subsequently selected key when
-     * the quota of keys from that split is satisfied.
-     */
-    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
-    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
-      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
-      ArrayList<K> samples = new ArrayList<K>(numSamples);
-      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
-
-      Random r = new Random();
-      long seed = r.nextLong();
-      r.setSeed(seed);
-      LOG.debug("seed: " + seed);
-      // shuffle splits
-      for (int i = 0; i < splits.length; ++i) {
-        InputSplit tmp = splits[i];
-        int j = r.nextInt(splits.length);
-        splits[i] = splits[j];
-        splits[j] = tmp;
-      }
-      // our target rate is in terms of the maximum number of sample splits,
-      // but we accept the possibility of sampling additional splits to hit
-      // the target sample keyset
-      for (int i = 0; i < splitsToSample ||
-                     (i < splits.length && samples.size() < numSamples); ++i) {
-        RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
-            Reporter.NULL);
-        K key = reader.createKey();
-        V value = reader.createValue();
-        while (reader.next(key, value)) {
-          if (r.nextDouble() <= freq) {
-            if (samples.size() < numSamples) {
-              samples.add(key);
-            } else {
-              // When exceeding the maximum number of samples, replace a
-              // random element with this one, then adjust the frequency
-              // to reflect the possibility of existing elements being
-              // pushed out
-              int ind = r.nextInt(numSamples);
-              if (ind != numSamples) {
-                samples.set(ind, key);
-              }
-              freq *= (numSamples - 1) / (double) numSamples;
-            }
-            key = reader.createKey();
-          }
-        }
-        reader.close();
-      }
-      return (K[])samples.toArray();
-    }
-  }
-
-  /**
-   * Sample from s splits at regular intervals.
-   * Useful for sorted data.
-   */
-  public static class IntervalSampler<K,V> implements Sampler<K,V> {
-    private final double freq;
-    private final int maxSplitsSampled;
-
-    /**
-     * Create a new IntervalSampler sampling <em>all</em> splits.
-     * @param freq The frequency with which records will be emitted.
-     */
-    public IntervalSampler(double freq) {
-      this(freq, Integer.MAX_VALUE);
-    }
-
-    /**
-     * Create a new IntervalSampler.
-     * @param freq The frequency with which records will be emitted.
-     * @param maxSplitsSampled The maximum number of splits to examine.
-     * @see #getSample
-     */
-    public IntervalSampler(double freq, int maxSplitsSampled) {
-      this.freq = freq;
-      this.maxSplitsSampled = maxSplitsSampled;
-    }
-
-    /**
-     * For each split sampled, emit when the ratio of the number of records
-     * retained to the total record count is less than the specified
-     * frequency.
-     */
-    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
-    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
-      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
-      ArrayList<K> samples = new ArrayList<K>();
-      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
-      int splitStep = splits.length / splitsToSample;
-      long records = 0;
-      long kept = 0;
-      for (int i = 0; i < splitsToSample; ++i) {
-        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
-            job, Reporter.NULL);
-        K key = reader.createKey();
-        V value = reader.createValue();
-        while (reader.next(key, value)) {
-          ++records;
-          if ((double) kept / records < freq) {
-            ++kept;
-            samples.add(key);
-            key = reader.createKey();
-          }
-        }
-        reader.close();
-      }
-      return (K[])samples.toArray();
-    }
-  }
-
-  /**
-   * Write a partition file for the given job, using the Sampler provided.
-   * Queries the sampler for a sample keyset, sorts by the output key
-   * comparator, selects the keys for each rank, and writes to the destination
-   * returned from {@link
-     org.apache.hadoop.mapred.lib.TotalOrderPartitioner#getPartitionFile}.
-   */
-  @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
-  public static <K,V> void writePartitionFile(JobConf job,
-      Sampler<K,V> sampler) throws IOException {
-    final InputFormat<K,V> inf = (InputFormat<K,V>) job.getInputFormat();
-    int numPartitions = job.getNumReduceTasks();
-    K[] samples = sampler.getSample(inf, job);
-    LOG.info("Using " + samples.length + " samples");
-    RawComparator<K> comparator =
-      (RawComparator<K>) job.getOutputKeyComparator();
-    Arrays.sort(samples, comparator);
-    Path dst = new Path(TotalOrderPartitioner.getPartitionFile(job));
-    FileSystem fs = dst.getFileSystem(job);
-    if (fs.exists(dst)) {
-      fs.delete(dst, false);
-    }
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, dst,
-        job.getMapOutputKeyClass(), NullWritable.class);
-    NullWritable nullValue = NullWritable.get();
-    float stepSize = samples.length / (float) numPartitions;
-    int last = -1;
-    for(int i = 1; i < numPartitions; ++i) {
-      int k = Math.round(stepSize * i);
-      while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
-        ++k;
-      }
-      writer.append(samples[k], nullValue);
-      last = k;
-    }
-    writer.close();
-  }
-
-  /**
-   * Driver for InputSampler from the command line.
-   * Configures a JobConf instance and calls {@link #writePartitionFile}.
-   */
-  public int run(String[] args) throws Exception {
-    JobConf job = (JobConf) getConf();
-    ArrayList<String> otherArgs = new ArrayList<String>();
-    Sampler<K,V> sampler = null;
-    for(int i=0; i < args.length; ++i) {
-      try {
-        if ("-r".equals(args[i])) {
-          job.setNumReduceTasks(Integer.parseInt(args[++i]));
-        } else if ("-inFormat".equals(args[i])) {
-          job.setInputFormat(
-              Class.forName(args[++i]).asSubclass(InputFormat.class));
-        } else if ("-keyClass".equals(args[i])) {
-          job.setMapOutputKeyClass(
-              Class.forName(args[++i]).asSubclass(WritableComparable.class));
-        } else if ("-splitSample".equals(args[i])) {
-          int numSamples = Integer.parseInt(args[++i]);
-          int maxSplits = Integer.parseInt(args[++i]);
-          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
-          sampler = new SplitSampler<K,V>(numSamples, maxSplits);
-        } else if ("-splitRandom".equals(args[i])) {
-          double pcnt = Double.parseDouble(args[++i]);
-          int numSamples = Integer.parseInt(args[++i]);
-          int maxSplits = Integer.parseInt(args[++i]);
-          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
-          sampler = new RandomSampler<K,V>(pcnt, numSamples, maxSplits);
-        } else if ("-splitInterval".equals(args[i])) {
-          double pcnt = Double.parseDouble(args[++i]);
-          int maxSplits = Integer.parseInt(args[++i]);
-          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
-          sampler = new IntervalSampler<K,V>(pcnt, maxSplits);
-        } else {
-          otherArgs.add(args[i]);
-        }
-      } catch (NumberFormatException except) {
-        System.out.println("ERROR: Integer expected instead of " + args[i]);
-        return printUsage();
-      } catch (ArrayIndexOutOfBoundsException except) {
-        System.out.println("ERROR: Required parameter missing from " +
-            args[i-1]);
-        return printUsage();
-      }
-    }
-    if (job.getNumReduceTasks() <= 1) {
-      System.err.println("Sampler requires more than one reducer");
-      return printUsage();
-    }
-    if (otherArgs.size() < 2) {
-      System.out.println("ERROR: Wrong number of parameters: ");
-      return printUsage();
-    }
-    if (null == sampler) {
-      sampler = new RandomSampler<K,V>(0.1, 10000, 10);
-    }
-
-    Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
-    TotalOrderPartitioner.setPartitionFile(job, outf);
-    for (String s : otherArgs) {
-      FileInputFormat.addInputPath(job, new Path(s));
-    }
-    InputSampler.<K,V>writePartitionFile(job, sampler);
-
-    return 0;
+    super(conf);
   }
 
-  public static void main(String[] args) throws Exception {
-    JobConf job = new JobConf(InputSampler.class);
-    InputSampler<?,?> sampler = new InputSampler(job);
-    int res = ToolRunner.run(sampler, args);
-    System.exit(res);
+  public static <K,V> void writePartitionFile(JobConf job, Sampler<K,V> sampler)
+      throws IOException, ClassNotFoundException, InterruptedException {
+    writePartitionFile(new Job(job), sampler);
   }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java?rev=771171&r1=771170&r2=771171&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java Mon May  4 04:18:22 2009
@@ -18,367 +18,26 @@
 
 package org.apache.hadoop.mapred.lib;
 
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.Arrays;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
-import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Partitioner effecting a total order by reading split points from
  * an externally generated source.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner}
  */
-public class TotalOrderPartitioner<K extends WritableComparable,V>
+@Deprecated
+public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
+    extends org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner<K, V>
     implements Partitioner<K,V> {
 
-  private Node partitions;
-  public static final String DEFAULT_PATH = "_partition.lst";
-
   public TotalOrderPartitioner() { }
 
-  /**
-   * Read in the partition file and build indexing data structures.
-   * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
-   * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
-   * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
-   * will be built. Otherwise, keys will be located using a binary search of
-   * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
-   * defined for this job. The input file must be sorted with the same
-   * comparator and contain {@link
-     org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
-   */
-  @SuppressWarnings("unchecked") // keytype from conf not static
   public void configure(JobConf job) {
-    try {
-      String parts = getPartitionFile(job);
-      final Path partFile = new Path(parts);
-      final FileSystem fs = (DEFAULT_PATH.equals(parts))
-        ? FileSystem.getLocal(job)     // assume in DistributedCache
-        : partFile.getFileSystem(job);
-
-      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
-      K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
-      if (splitPoints.length != job.getNumReduceTasks() - 1) {
-        throw new IOException("Wrong number of partitions in keyset");
-      }
-      RawComparator<K> comparator =
-        (RawComparator<K>) job.getOutputKeyComparator();
-      for (int i = 0; i < splitPoints.length - 1; ++i) {
-        if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
-          throw new IOException("Split points are out of order");
-        }
-      }
-      boolean natOrder =
-        job.getBoolean("total.order.partitioner.natural.order", true);
-      if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
-        partitions = buildTrie((BinaryComparable[])splitPoints, 0,
-            splitPoints.length, new byte[0],
-            // Now that blocks of identical splitless trie nodes are 
-            // represented reentrantly, and we develop a leaf for any trie
-            // node with only one split point, the only reason for a depth
-            // limit is to refute stack overflow or bloat in the pathological
-            // case where the split points are long and mostly look like bytes 
-            // iii...iixii...iii   .  Therefore, we make the default depth
-            // limit large but not huge.
-            job.getInt("total.order.partitioner.max.trie.depth", 200));
-      } else {
-        partitions = new BinarySearchNode(splitPoints, comparator);
-      }
-    } catch (IOException e) {
-      throw new IllegalArgumentException("Can't read partitions file", e);
-    }
-  }
-
-                                 // by construction, we know if our keytype
-  @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
-  public int getPartition(K key, V value, int numPartitions) {
-    return partitions.findPartition(key);
-  }
-
-  /**
-   * Set the path to the SequenceFile storing the sorted partition keyset.
-   * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
-   * keys in the SequenceFile.
-   */
-  public static void setPartitionFile(JobConf job, Path p) {
-    job.set("total.order.partitioner.path", p.toString());
-  }
-
-  /**
-   * Get the path to the SequenceFile storing the sorted partition keyset.
-   * @see #setPartitionFile(JobConf,Path)
-   */
-  public static String getPartitionFile(JobConf job) {
-    return job.get("total.order.partitioner.path", DEFAULT_PATH);
-  }
-
-  /**
-   * Interface to the partitioner to locate a key in the partition keyset.
-   */
-  interface Node<T> {
-    /**
-     * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
-     * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
-     */
-    int findPartition(T key);
-  }
-
-  /**
-   * Base class for trie nodes. If the keytype is memcomp-able, this builds
-   * tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
-   * bytes.
-   */
-  static abstract class TrieNode implements Node<BinaryComparable> {
-    private final int level;
-    TrieNode(int level) {
-      this.level = level;
-    }
-    int getLevel() {
-      return level;
-    }
-  }
-
-  /**
-   * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
-   * where disabled by <tt>total.order.partitioner.natural.order</tt>,
-   * search the partition keyset with a binary search.
-   */
-  class BinarySearchNode implements Node<K> {
-    private final K[] splitPoints;
-    private final RawComparator<K> comparator;
-    BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
-      this.splitPoints = splitPoints;
-      this.comparator = comparator;
-    }
-    public int findPartition(K key) {
-      final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
-      return (pos < 0) ? -pos : pos;
-    }
+    super.setConf(job);
   }
 
-  /**
-   * An inner trie node that contains 256 children based on the next
-   * character.
-   */
-  class InnerTrieNode extends TrieNode {
-    private TrieNode[] child = new TrieNode[256];
-
-    InnerTrieNode(int level) {
-      super(level);
-    }
-    public int findPartition(BinaryComparable key) {
-      int level = getLevel();
-      if (key.getLength() <= level) {
-        return child[0].findPartition(key);
-      }
-      return child[0xFF & key.getBytes()[level]].findPartition(key);
-    }
-  }
-  
-  /**
-   * @param level        the tree depth at this node
-   * @param splitPoints  the full split point vector, which holds
-   *                     the split point or points this leaf node
-   *                     should contain
-   * @param lower        first INcluded element of splitPoints
-   * @param upper        first EXcluded element of splitPoints
-   * @return  a leaf node.  They come in three kinds: no split points 
-   *          [and the findParttion returns a canned index], one split
-   *          point [and we compare with a single comparand], or more
-   *          than one [and we do a binary search].  The last case is
-   *          rare.
-   */
-  private TrieNode LeafTrieNodeFactory
-             (int level, BinaryComparable[] splitPoints, int lower, int upper) {
-      switch (upper - lower) {
-      case 0:
-          return new UnsplitTrieNode(level, lower);
-          
-      case 1:
-          return new SinglySplitTrieNode(level, splitPoints, lower);
-          
-      default:
-          return new LeafTrieNode(level, splitPoints, lower, upper);
-      }
-  }
-
-  /**
-   * A leaf trie node that scans for the key between lower..upper.
-   * 
-   * We don't generate many of these now, since we usually continue trie-ing 
-   * when more than one split point remains at this level. and we make different
-   * objects for nodes with 0 or 1 split point.
-   */
-  private class LeafTrieNode extends TrieNode {
-    final int lower;
-    final int upper;
-    final BinaryComparable[] splitPoints;
-    LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
-      super(level);
-      this.lower = lower;
-      this.upper = upper;
-      this.splitPoints = splitPoints;
-    }
-    public int findPartition(BinaryComparable key) {
-      final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
-      return (pos < 0) ? -pos : pos;
-    }
-  }
-  
-  private class UnsplitTrieNode extends TrieNode {
-      final int result;
-      
-      UnsplitTrieNode(int level, int value) {
-          super(level);
-          this.result = value;
-      }
-      
-      public int findPartition(BinaryComparable key) {
-          return result;
-      }
-  }
-  
-  private class SinglySplitTrieNode extends TrieNode {
-      final int               lower;
-      final BinaryComparable  mySplitPoint;
-      
-      SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
-          super(level);
-          this.lower = lower;
-          this.mySplitPoint = splitPoints[lower];
-      }
-      
-      public int findPartition(BinaryComparable key) {
-          return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
-      }
-  }
-
-
-  /**
-   * Read the cut points from the given IFile.
-   * @param fs The file system
-   * @param p The path to read
-   * @param keyClass The map output key class
-   * @param job The job config
-   * @throws IOException
-   */
-                                 // matching key types enforced by passing in
-  @SuppressWarnings("unchecked") // map output key class
-  private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
-      JobConf job) throws IOException {
-    SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
-    ArrayList<K> parts = new ArrayList<K>();
-    K key = (K) ReflectionUtils.newInstance(keyClass, job);
-    NullWritable value = NullWritable.get();
-    while (reader.next(key, value)) {
-      parts.add(key);
-      key = (K) ReflectionUtils.newInstance(keyClass, job);
-    }
-    reader.close();
-    return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
-  }
-  
-  /**
-   * 
-   * This object contains a TrieNodeRef if there is such a thing that
-   * can be repeated.  Two adjacent trie node slots that contain no 
-   * split points can be filled with the same trie node, even if they
-   * are not on the same level.  See buildTreeRec, below.
-   *
-   */  
-  private class CarriedTrieNodeRef
-  {
-      TrieNode   content;
-      
-      CarriedTrieNodeRef() {
-          content = null;
-      }
-  }
-
-  
-  /**
-   * Given a sorted set of cut points, build a trie that will find the correct
-   * partition quickly.
-   * @param splits the list of cut points
-   * @param lower the lower bound of partitions 0..numPartitions-1
-   * @param upper the upper bound of partitions 0..numPartitions-1
-   * @param prefix the prefix that we have already checked against
-   * @param maxDepth the maximum depth we will build a trie for
-   * @return the trie node that will divide the splits correctly
-   */
-  private TrieNode buildTrie(BinaryComparable[] splits, int lower,
-          int upper, byte[] prefix, int maxDepth) {
-      return buildTrieRec
-               (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
-  }
-  
-  /**
-   * This is the core of buildTrie.  The interface, and stub, above, just adds
-   * an empty CarriedTrieNodeRef.  
-   * 
-   * We build trie nodes in depth first order, which is also in key space
-   * order.  Every leaf node is referenced as a slot in a parent internal
-   * node.  If two adjacent slots [in the DFO] hold leaf nodes that have
-   * no split point, then they are not separated by a split point either, 
-   * because there's no place in key space for that split point to exist.
-   * 
-   * When that happens, the leaf nodes would be semantically identical, and
-   * we reuse the object.  A single CarriedTrieNodeRef "ref" lives for the 
-   * duration of the tree-walk.  ref carries a potentially reusable, unsplit
-   * leaf node for such reuse until a leaf node with a split arises, which 
-   * breaks the chain until we need to make a new unsplit leaf node.
-   * 
-   * Note that this use of CarriedTrieNodeRef means that for internal nodes, 
-   * for internal nodes if this code is modified in any way we still need 
-   * to make or fill in the subnodes in key space order.
-   */
-  private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
-      int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
-    final int depth = prefix.length;
-    // We generate leaves for a single split point as well as for 
-    // no split points.
-    if (depth >= maxDepth || lower >= upper - 1) {
-        // If we have two consecutive requests for an unsplit trie node, we
-        // can deliver the same one the second time.
-        if (lower == upper && ref.content != null) {
-            return ref.content;
-        }
-        TrieNode  result = LeafTrieNodeFactory(depth, splits, lower, upper);
-        ref.content = lower == upper ? result : null;
-        return result;
-    }
-    InnerTrieNode result = new InnerTrieNode(depth);
-    byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
-    // append an extra byte on to the prefix
-    int         currentBound = lower;
-    for(int ch = 0; ch < 0xFF; ++ch) {
-      trial[depth] = (byte) (ch + 1);
-      lower = currentBound;
-      while (currentBound < upper) {
-        if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
-          break;
-        }
-        currentBound += 1;
-      }
-      trial[depth] = (byte) ch;
-      result.child[0xFF & ch]
-                   = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
-    }
-    // pick up the rest
-    trial[depth] = (byte)0xFF;
-    result.child[0xFF] 
-                 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
-    
-    return result;
-  }
 }

Copied: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java (from r771170, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java)
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java?p2=hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java&p1=hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java&r1=771170&r2=771171&rev=771171&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java Mon May  4 04:18:22 2009
@@ -16,82 +16,74 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.lib;
+package org.apache.hadoop.mapreduce.lib.partition;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Utility for collecting samples and writing a partition file for
- * {@link org.apache.hadoop.mapred.lib.TotalOrderPartitioner}.
+ * {@link TotalOrderPartitioner}.
  */
-public class InputSampler<K,V> implements Tool {
+public class InputSampler<K,V> extends Configured implements Tool  {
 
   private static final Log LOG = LogFactory.getLog(InputSampler.class);
 
   static int printUsage() {
     System.out.println("sampler -r <reduces>\n" +
-                       "      [-inFormat <input format class>]\n" +
-                       "      [-keyClass <map input & output key class>]\n" +
-                       "      [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
-                       "// Sample from random splits at random (general)\n" +
-                       "       -splitSample <numSamples> <maxsplits> | " +
-                       "             // Sample from first records in splits (random data)\n"+
-                       "       -splitInterval <double pcnt> <maxsplits>]" +
-                       "             // Sample from splits at intervals (sorted data)");
+      "      [-inFormat <input format class>]\n" +
+      "      [-keyClass <map input & output key class>]\n" +
+      "      [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
+      "// Sample from random splits at random (general)\n" +
+      "       -splitSample <numSamples> <maxsplits> | " +
+      "             // Sample from first records in splits (random data)\n"+
+      "       -splitInterval <double pcnt> <maxsplits>]" +
+      "             // Sample from splits at intervals (sorted data)");
     System.out.println("Default sampler: -splitRandom 0.1 10000 10");
     ToolRunner.printGenericCommandUsage(System.out);
     return -1;
   }
 
-  private JobConf conf;
-
-  public InputSampler(JobConf conf) {
-    this.conf = conf;
-  }
-
-  public Configuration getConf() {
-    return conf;
-  }
-
-  public void setConf(Configuration conf) {
-    if (!(conf instanceof JobConf)) {
-      this.conf = new JobConf(conf);
-    } else {
-      this.conf = (JobConf) conf;
-    }
+  public InputSampler(Configuration conf) {
+    setConf(conf);
   }
 
   /**
-   * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
+   * Interface to sample using an 
+   * {@link org.apache.hadoop.mapreduce.InputFormat}.
    */
   public interface Sampler<K,V> {
     /**
      * For a given job, collect and return a subset of the keys from the
      * input data.
      */
-    K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
+    K[] getSample(InputFormat<K,V> inf, Job job) 
+    throws IOException, InterruptedException;
   }
 
   /**
@@ -128,21 +120,20 @@
      * From each split sampled, take the first numSamples / numSplits records.
      */
     @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
-    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
-      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+    public K[] getSample(InputFormat<K,V> inf, Job job) 
+        throws IOException, InterruptedException {
+      List<InputSplit> splits = inf.getSplits(job);
       ArrayList<K> samples = new ArrayList<K>(numSamples);
-      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
-      int splitStep = splits.length / splitsToSample;
+      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+      int splitStep = splits.size() / splitsToSample;
       int samplesPerSplit = numSamples / splitsToSample;
       long records = 0;
       for (int i = 0; i < splitsToSample; ++i) {
-        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
-            job, Reporter.NULL);
-        K key = reader.createKey();
-        V value = reader.createValue();
-        while (reader.next(key, value)) {
-          samples.add(key);
-          key = reader.createKey();
+        RecordReader<K,V> reader = inf.createRecordReader(
+          splits.get(i * splitStep), 
+          new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
+        while (reader.nextKeyValue()) {
+          samples.add(reader.getCurrentKey());
           ++records;
           if ((i+1) * samplesPerSplit <= records) {
             break;
@@ -195,35 +186,34 @@
      * the quota of keys from that split is satisfied.
      */
     @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
-    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
-      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+    public K[] getSample(InputFormat<K,V> inf, Job job) 
+        throws IOException, InterruptedException {
+      List<InputSplit> splits = inf.getSplits(job);
       ArrayList<K> samples = new ArrayList<K>(numSamples);
-      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
 
       Random r = new Random();
       long seed = r.nextLong();
       r.setSeed(seed);
       LOG.debug("seed: " + seed);
       // shuffle splits
-      for (int i = 0; i < splits.length; ++i) {
-        InputSplit tmp = splits[i];
-        int j = r.nextInt(splits.length);
-        splits[i] = splits[j];
-        splits[j] = tmp;
+      for (int i = 0; i < splits.size(); ++i) {
+        InputSplit tmp = splits.get(i);
+        int j = r.nextInt(splits.size());
+        splits.set(i, splits.get(j));
+        splits.set(j, tmp);
       }
       // our target rate is in terms of the maximum number of sample splits,
       // but we accept the possibility of sampling additional splits to hit
       // the target sample keyset
       for (int i = 0; i < splitsToSample ||
-                     (i < splits.length && samples.size() < numSamples); ++i) {
-        RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
-            Reporter.NULL);
-        K key = reader.createKey();
-        V value = reader.createValue();
-        while (reader.next(key, value)) {
+                     (i < splits.size() && samples.size() < numSamples); ++i) {
+        RecordReader<K,V> reader = inf.createRecordReader(splits.get(i), 
+          new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
+        while (reader.nextKeyValue()) {
           if (r.nextDouble() <= freq) {
             if (samples.size() < numSamples) {
-              samples.add(key);
+              samples.add(reader.getCurrentKey());
             } else {
               // When exceeding the maximum number of samples, replace a
               // random element with this one, then adjust the frequency
@@ -231,11 +221,10 @@
               // pushed out
               int ind = r.nextInt(numSamples);
               if (ind != numSamples) {
-                samples.set(ind, key);
+                samples.set(ind, reader.getCurrentKey());
               }
               freq *= (numSamples - 1) / (double) numSamples;
             }
-            key = reader.createKey();
           }
         }
         reader.close();
@@ -277,24 +266,23 @@
      * frequency.
      */
     @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
-    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
-      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+    public K[] getSample(InputFormat<K,V> inf, Job job) 
+        throws IOException, InterruptedException {
+      List<InputSplit> splits = inf.getSplits(job);
       ArrayList<K> samples = new ArrayList<K>();
-      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
-      int splitStep = splits.length / splitsToSample;
+      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+      int splitStep = splits.size() / splitsToSample;
       long records = 0;
       long kept = 0;
       for (int i = 0; i < splitsToSample; ++i) {
-        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
-            job, Reporter.NULL);
-        K key = reader.createKey();
-        V value = reader.createValue();
-        while (reader.next(key, value)) {
+        RecordReader<K,V> reader = inf.createRecordReader(
+          splits.get(i * splitStep),
+          new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
+        while (reader.nextKeyValue()) {
           ++records;
           if ((double) kept / records < freq) {
             ++kept;
-            samples.add(key);
-            key = reader.createKey();
+            samples.add(reader.getCurrentKey());
           }
         }
         reader.close();
@@ -307,26 +295,27 @@
    * Write a partition file for the given job, using the Sampler provided.
    * Queries the sampler for a sample keyset, sorts by the output key
    * comparator, selects the keys for each rank, and writes to the destination
-   * returned from {@link
-     org.apache.hadoop.mapred.lib.TotalOrderPartitioner#getPartitionFile}.
+   * returned from {@link TotalOrderPartitioner#getPartitionFile}.
    */
   @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
-  public static <K,V> void writePartitionFile(JobConf job,
-      Sampler<K,V> sampler) throws IOException {
-    final InputFormat<K,V> inf = (InputFormat<K,V>) job.getInputFormat();
+  public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler) 
+      throws IOException, ClassNotFoundException, InterruptedException {
+    Configuration conf = job.getConfiguration();
+    final InputFormat inf = 
+        ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
     int numPartitions = job.getNumReduceTasks();
     K[] samples = sampler.getSample(inf, job);
     LOG.info("Using " + samples.length + " samples");
     RawComparator<K> comparator =
-      (RawComparator<K>) job.getOutputKeyComparator();
+      (RawComparator<K>) job.getSortComparator();
     Arrays.sort(samples, comparator);
-    Path dst = new Path(TotalOrderPartitioner.getPartitionFile(job));
-    FileSystem fs = dst.getFileSystem(job);
+    Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
+    FileSystem fs = dst.getFileSystem(conf);
     if (fs.exists(dst)) {
       fs.delete(dst, false);
     }
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, dst,
-        job.getMapOutputKeyClass(), NullWritable.class);
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, 
+      conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
     NullWritable nullValue = NullWritable.get();
     float stepSize = samples.length / (float) numPartitions;
     int last = -1;
@@ -346,7 +335,7 @@
    * Configures a JobConf instance and calls {@link #writePartitionFile}.
    */
   public int run(String[] args) throws Exception {
-    JobConf job = (JobConf) getConf();
+    Job job = new Job(getConf());
     ArrayList<String> otherArgs = new ArrayList<String>();
     Sampler<K,V> sampler = null;
     for(int i=0; i < args.length; ++i) {
@@ -354,7 +343,7 @@
         if ("-r".equals(args[i])) {
           job.setNumReduceTasks(Integer.parseInt(args[++i]));
         } else if ("-inFormat".equals(args[i])) {
-          job.setInputFormat(
+          job.setInputFormatClass(
               Class.forName(args[++i]).asSubclass(InputFormat.class));
         } else if ("-keyClass".equals(args[i])) {
           job.setMapOutputKeyClass(
@@ -400,7 +389,7 @@
     }
 
     Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
-    TotalOrderPartitioner.setPartitionFile(job, outf);
+    TotalOrderPartitioner.setPartitionFile(getConf(), outf);
     for (String s : otherArgs) {
       FileInputFormat.addInputPath(job, new Path(s));
     }
@@ -410,8 +399,7 @@
   }
 
   public static void main(String[] args) throws Exception {
-    JobConf job = new JobConf(InputSampler.class);
-    InputSampler<?,?> sampler = new InputSampler(job);
+    InputSampler<?,?> sampler = new InputSampler(new Configuration());
     int res = ToolRunner.run(sampler, args);
     System.exit(res);
   }

Copied: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java (from r771170, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java)
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java?p2=hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java&p1=hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java&r1=771170&r2=771171&rev=771171&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java Mon May  4 04:18:22 2009
@@ -16,13 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.lib;
+package org.apache.hadoop.mapreduce.lib.partition;
 
 import java.io.IOException;
 import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.Arrays;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BinaryComparable;
@@ -30,19 +32,20 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Partitioner effecting a total order by reading split points from
  * an externally generated source.
  */
-public class TotalOrderPartitioner<K extends WritableComparable,V>
-    implements Partitioner<K,V> {
+public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
+    extends Partitioner<K,V> implements Configurable {
 
   private Node partitions;
   public static final String DEFAULT_PATH = "_partition.lst";
+  Configuration conf;
 
   public TotalOrderPartitioner() { }
 
@@ -54,32 +57,33 @@
    * will be built. Otherwise, keys will be located using a binary search of
    * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
    * defined for this job. The input file must be sorted with the same
-   * comparator and contain {@link
-     org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
+   * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
    */
   @SuppressWarnings("unchecked") // keytype from conf not static
-  public void configure(JobConf job) {
+  public void setConf(Configuration conf) {
     try {
-      String parts = getPartitionFile(job);
+      this.conf = conf;
+      String parts = getPartitionFile(conf);
       final Path partFile = new Path(parts);
       final FileSystem fs = (DEFAULT_PATH.equals(parts))
-        ? FileSystem.getLocal(job)     // assume in DistributedCache
-        : partFile.getFileSystem(job);
+        ? FileSystem.getLocal(conf)     // assume in DistributedCache
+        : partFile.getFileSystem(conf);
 
+      Job job = new Job(conf);
       Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
-      K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
+      K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
       if (splitPoints.length != job.getNumReduceTasks() - 1) {
         throw new IOException("Wrong number of partitions in keyset");
       }
       RawComparator<K> comparator =
-        (RawComparator<K>) job.getOutputKeyComparator();
+        (RawComparator<K>) job.getSortComparator();
       for (int i = 0; i < splitPoints.length - 1; ++i) {
         if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
           throw new IOException("Split points are out of order");
         }
       }
       boolean natOrder =
-        job.getBoolean("total.order.partitioner.natural.order", true);
+        conf.getBoolean("total.order.partitioner.natural.order", true);
       if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
         partitions = buildTrie((BinaryComparable[])splitPoints, 0,
             splitPoints.length, new byte[0],
@@ -90,7 +94,7 @@
             // case where the split points are long and mostly look like bytes 
             // iii...iixii...iii   .  Therefore, we make the default depth
             // limit large but not huge.
-            job.getInt("total.order.partitioner.max.trie.depth", 200));
+            conf.getInt("total.order.partitioner.max.trie.depth", 200));
       } else {
         partitions = new BinarySearchNode(splitPoints, comparator);
       }
@@ -99,7 +103,11 @@
     }
   }
 
-                                 // by construction, we know if our keytype
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  // by construction, we know if our keytype
   @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
   public int getPartition(K key, V value, int numPartitions) {
     return partitions.findPartition(key);
@@ -110,16 +118,16 @@
    * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
    * keys in the SequenceFile.
    */
-  public static void setPartitionFile(JobConf job, Path p) {
-    job.set("total.order.partitioner.path", p.toString());
+  public static void setPartitionFile(Configuration conf, Path p) {
+    conf.set("total.order.partitioner.path", p.toString());
   }
 
   /**
    * Get the path to the SequenceFile storing the sorted partition keyset.
-   * @see #setPartitionFile(JobConf,Path)
+   * @see #setPartitionFile(Configuration, Path)
    */
-  public static String getPartitionFile(JobConf job) {
-    return job.get("total.order.partitioner.path", DEFAULT_PATH);
+  public static String getPartitionFile(Configuration conf) {
+    return conf.get("total.order.partitioner.path", DEFAULT_PATH);
   }
 
   /**
@@ -275,14 +283,14 @@
                                  // matching key types enforced by passing in
   @SuppressWarnings("unchecked") // map output key class
   private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
-      JobConf job) throws IOException {
-    SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
+      Configuration conf) throws IOException {
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
     ArrayList<K> parts = new ArrayList<K>();
-    K key = (K) ReflectionUtils.newInstance(keyClass, job);
+    K key = (K) ReflectionUtils.newInstance(keyClass, conf);
     NullWritable value = NullWritable.get();
     while (reader.next(key, value)) {
       parts.add(key);
-      key = (K) ReflectionUtils.newInstance(keyClass, job);
+      key = (K) ReflectionUtils.newInstance(keyClass, conf);
     }
     reader.close();
     return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));

Copied: hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java (from r771170, hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java)
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java?p2=hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java&p1=hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java&r1=771170&r2=771171&rev=771171&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java Mon May  4 04:18:22 2009
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.lib;
+package org.apache.hadoop.mapreduce.lib.partition;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -24,6 +24,7 @@
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
@@ -33,7 +34,6 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.JobConf;
 
 public class TestTotalOrderPartitioner extends TestCase {
 
@@ -75,17 +75,16 @@
     testStrings.add(new Check<Text>(new Text("hi"), 6));
   };
 
-  private static <T extends WritableComparable> Path writePartitionFile(
-      String testname, JobConf conf, T[] splits) throws IOException {
+  private static <T extends WritableComparable<?>> Path writePartitionFile(
+      String testname, Configuration conf, T[] splits) throws IOException {
     final FileSystem fs = FileSystem.getLocal(conf);
     final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
                                  ).makeQualified(fs);
     Path p = new Path(testdir, testname + "/_partition.lst");
     TotalOrderPartitioner.setPartitionFile(conf, p);
-    conf.setNumReduceTasks(splits.length + 1);
+    conf.setInt("mapred.reduce.tasks", splits.length + 1);
     SequenceFile.Writer w = null;
     try {
-      NullWritable nw = NullWritable.get();
       w = SequenceFile.createWriter(fs, conf, p,
           splits[0].getClass(), NullWritable.class,
           SequenceFile.CompressionType.NONE);
@@ -102,39 +101,39 @@
   public void testTotalOrderMemCmp() throws Exception {
     TotalOrderPartitioner<Text,NullWritable> partitioner =
       new TotalOrderPartitioner<Text,NullWritable>();
-    JobConf job = new JobConf();
+    Configuration conf = new Configuration();
     Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
-        "totalordermemcmp", job, splitStrings);
-    job.setMapOutputKeyClass(Text.class);
+        "totalordermemcmp", conf, splitStrings);
+    conf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
     try {
-      partitioner.configure(job);
+      partitioner.setConf(conf);
       NullWritable nw = NullWritable.get();
       for (Check<Text> chk : testStrings) {
         assertEquals(chk.data.toString(), chk.part,
             partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
       }
     } finally {
-      p.getFileSystem(job).delete(p, true);
+      p.getFileSystem(conf).delete(p, true);
     }
   }
 
   public void testTotalOrderBinarySearch() throws Exception {
     TotalOrderPartitioner<Text,NullWritable> partitioner =
       new TotalOrderPartitioner<Text,NullWritable>();
-    JobConf job = new JobConf();
+    Configuration conf = new Configuration();
     Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
-        "totalorderbinarysearch", job, splitStrings);
-    job.setBoolean("total.order.partitioner.natural.order", false);
-    job.setMapOutputKeyClass(Text.class);
+        "totalorderbinarysearch", conf, splitStrings);
+    conf.setBoolean("total.order.partitioner.natural.order", false);
+    conf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
     try {
-      partitioner.configure(job);
+      partitioner.setConf(conf);
       NullWritable nw = NullWritable.get();
       for (Check<Text> chk : testStrings) {
         assertEquals(chk.data.toString(), chk.part,
             partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
       }
     } finally {
-      p.getFileSystem(job).delete(p, true);
+      p.getFileSystem(conf).delete(p, true);
     }
   }
 
@@ -153,14 +152,15 @@
   public void testTotalOrderCustomComparator() throws Exception {
     TotalOrderPartitioner<Text,NullWritable> partitioner =
       new TotalOrderPartitioner<Text,NullWritable>();
-    JobConf job = new JobConf();
+    Configuration conf = new Configuration();
     Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
     Arrays.sort(revSplitStrings, new ReverseStringComparator());
     Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
-        "totalordercustomcomparator", job, revSplitStrings);
-    job.setBoolean("total.order.partitioner.natural.order", false);
-    job.setMapOutputKeyClass(Text.class);
-    job.setOutputKeyComparatorClass(ReverseStringComparator.class);
+        "totalordercustomcomparator", conf, revSplitStrings);
+    conf.setBoolean("total.order.partitioner.natural.order", false);
+    conf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
+    conf.setClass("mapred.output.key.comparator.class",
+      ReverseStringComparator.class, RawComparator.class);
     ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>();
     revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
     revCheck.add(new Check<Text>(new Text("aaabb"), 9));
@@ -174,15 +174,14 @@
     revCheck.add(new Check<Text>(new Text("ddngo"), 4));
     revCheck.add(new Check<Text>(new Text("hi"), 3));
     try {
-      partitioner.configure(job);
+      partitioner.setConf(conf);
       NullWritable nw = NullWritable.get();
       for (Check<Text> chk : revCheck) {
         assertEquals(chk.data.toString(), chk.part,
             partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
       }
     } finally {
-      p.getFileSystem(job).delete(p, true);
+      p.getFileSystem(conf).delete(p, true);
     }
   }
-
 }