You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/05/04 06:18:23 UTC
svn commit: r771171 - in /hadoop/core/trunk: ./
src/mapred/org/apache/hadoop/mapred/lib/
src/mapred/org/apache/hadoop/mapreduce/lib/partition/
src/test/org/apache/hadoop/mapred/lib/
src/test/org/apache/hadoop/mapreduce/lib/partition/
Author: cdouglas
Date: Mon May 4 04:18:22 2009
New Revision: 771171
URL: http://svn.apache.org/viewvc?rev=771171&view=rev
Log:
HADOOP-5668. Change TotalOrderPartitioner to use new API. Contributed by Amareshwari Sriramadasu
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
- copied, changed from r771170, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java
- copied, changed from r771170, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java
- copied, changed from r771170, hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java
Removed:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=771171&r1=771170&r2=771171&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May 4 04:18:22 2009
@@ -44,6 +44,9 @@
and TASK_CLEANUP. Removes the isMap methods from TaskID/TaskAttemptID
classes. (ddas)
+ HADOOP-5668. Change TotalOrderPartitioner to use new API. (Amareshwari
+ Sriramadasu via cdouglas)
+
NEW FEATURES
HADOOP-4268. Change fsck to use ClientProtocol methods so that the
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java?rev=771171&r1=771170&r2=771171&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java Mon May 4 04:18:22 2009
@@ -19,400 +19,24 @@
package org.apache.hadoop.mapred.lib;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Random;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.mapreduce.Job;
/**
- * Utility for collecting samples and writing a partition file for
- * {@link org.apache.hadoop.mapred.lib.TotalOrderPartitioner}.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.partition.InputSampler}
*/
-public class InputSampler<K,V> implements Tool {
-
- private static final Log LOG = LogFactory.getLog(InputSampler.class);
-
- static int printUsage() {
- System.out.println("sampler -r <reduces>\n" +
- " [-inFormat <input format class>]\n" +
- " [-keyClass <map input & output key class>]\n" +
- " [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
- "// Sample from random splits at random (general)\n" +
- " -splitSample <numSamples> <maxsplits> | " +
- " // Sample from first records in splits (random data)\n"+
- " -splitInterval <double pcnt> <maxsplits>]" +
- " // Sample from splits at intervals (sorted data)");
- System.out.println("Default sampler: -splitRandom 0.1 10000 10");
- ToolRunner.printGenericCommandUsage(System.out);
- return -1;
- }
-
- private JobConf conf;
+@Deprecated
+public class InputSampler<K,V> extends
+ org.apache.hadoop.mapreduce.lib.partition.InputSampler<K, V> {
public InputSampler(JobConf conf) {
- this.conf = conf;
- }
-
- public Configuration getConf() {
- return conf;
- }
-
- public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- this.conf = new JobConf(conf);
- } else {
- this.conf = (JobConf) conf;
- }
- }
-
- /**
- * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
- */
- public interface Sampler<K,V> {
- /**
- * For a given job, collect and return a subset of the keys from the
- * input data.
- */
- K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
- }
-
- /**
- * Samples the first n records from s splits.
- * Inexpensive way to sample random data.
- */
- public static class SplitSampler<K,V> implements Sampler<K,V> {
-
- private final int numSamples;
- private final int maxSplitsSampled;
-
- /**
- * Create a SplitSampler sampling <em>all</em> splits.
- * Takes the first numSamples / numSplits records from each split.
- * @param numSamples Total number of samples to obtain from all selected
- * splits.
- */
- public SplitSampler(int numSamples) {
- this(numSamples, Integer.MAX_VALUE);
- }
-
- /**
- * Create a new SplitSampler.
- * @param numSamples Total number of samples to obtain from all selected
- * splits.
- * @param maxSplitsSampled The maximum number of splits to examine.
- */
- public SplitSampler(int numSamples, int maxSplitsSampled) {
- this.numSamples = numSamples;
- this.maxSplitsSampled = maxSplitsSampled;
- }
-
- /**
- * From each split sampled, take the first numSamples / numSplits records.
- */
- @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
- public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
- InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
- ArrayList<K> samples = new ArrayList<K>(numSamples);
- int splitsToSample = Math.min(maxSplitsSampled, splits.length);
- int splitStep = splits.length / splitsToSample;
- int samplesPerSplit = numSamples / splitsToSample;
- long records = 0;
- for (int i = 0; i < splitsToSample; ++i) {
- RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
- job, Reporter.NULL);
- K key = reader.createKey();
- V value = reader.createValue();
- while (reader.next(key, value)) {
- samples.add(key);
- key = reader.createKey();
- ++records;
- if ((i+1) * samplesPerSplit <= records) {
- break;
- }
- }
- reader.close();
- }
- return (K[])samples.toArray();
- }
- }
-
- /**
- * Sample from random points in the input.
- * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
- * each split.
- */
- public static class RandomSampler<K,V> implements Sampler<K,V> {
- private double freq;
- private final int numSamples;
- private final int maxSplitsSampled;
-
- /**
- * Create a new RandomSampler sampling <em>all</em> splits.
- * This will read every split at the client, which is very expensive.
- * @param freq Probability with which a key will be chosen.
- * @param numSamples Total number of samples to obtain from all selected
- * splits.
- */
- public RandomSampler(double freq, int numSamples) {
- this(freq, numSamples, Integer.MAX_VALUE);
- }
-
- /**
- * Create a new RandomSampler.
- * @param freq Probability with which a key will be chosen.
- * @param numSamples Total number of samples to obtain from all selected
- * splits.
- * @param maxSplitsSampled The maximum number of splits to examine.
- */
- public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
- this.freq = freq;
- this.numSamples = numSamples;
- this.maxSplitsSampled = maxSplitsSampled;
- }
-
- /**
- * Randomize the split order, then take the specified number of keys from
- * each split sampled, where each key is selected with the specified
- * probability and possibly replaced by a subsequently selected key when
- * the quota of keys from that split is satisfied.
- */
- @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
- public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
- InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
- ArrayList<K> samples = new ArrayList<K>(numSamples);
- int splitsToSample = Math.min(maxSplitsSampled, splits.length);
-
- Random r = new Random();
- long seed = r.nextLong();
- r.setSeed(seed);
- LOG.debug("seed: " + seed);
- // shuffle splits
- for (int i = 0; i < splits.length; ++i) {
- InputSplit tmp = splits[i];
- int j = r.nextInt(splits.length);
- splits[i] = splits[j];
- splits[j] = tmp;
- }
- // our target rate is in terms of the maximum number of sample splits,
- // but we accept the possibility of sampling additional splits to hit
- // the target sample keyset
- for (int i = 0; i < splitsToSample ||
- (i < splits.length && samples.size() < numSamples); ++i) {
- RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
- Reporter.NULL);
- K key = reader.createKey();
- V value = reader.createValue();
- while (reader.next(key, value)) {
- if (r.nextDouble() <= freq) {
- if (samples.size() < numSamples) {
- samples.add(key);
- } else {
- // When exceeding the maximum number of samples, replace a
- // random element with this one, then adjust the frequency
- // to reflect the possibility of existing elements being
- // pushed out
- int ind = r.nextInt(numSamples);
- if (ind != numSamples) {
- samples.set(ind, key);
- }
- freq *= (numSamples - 1) / (double) numSamples;
- }
- key = reader.createKey();
- }
- }
- reader.close();
- }
- return (K[])samples.toArray();
- }
- }
-
- /**
- * Sample from s splits at regular intervals.
- * Useful for sorted data.
- */
- public static class IntervalSampler<K,V> implements Sampler<K,V> {
- private final double freq;
- private final int maxSplitsSampled;
-
- /**
- * Create a new IntervalSampler sampling <em>all</em> splits.
- * @param freq The frequency with which records will be emitted.
- */
- public IntervalSampler(double freq) {
- this(freq, Integer.MAX_VALUE);
- }
-
- /**
- * Create a new IntervalSampler.
- * @param freq The frequency with which records will be emitted.
- * @param maxSplitsSampled The maximum number of splits to examine.
- * @see #getSample
- */
- public IntervalSampler(double freq, int maxSplitsSampled) {
- this.freq = freq;
- this.maxSplitsSampled = maxSplitsSampled;
- }
-
- /**
- * For each split sampled, emit when the ratio of the number of records
- * retained to the total record count is less than the specified
- * frequency.
- */
- @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
- public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
- InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
- ArrayList<K> samples = new ArrayList<K>();
- int splitsToSample = Math.min(maxSplitsSampled, splits.length);
- int splitStep = splits.length / splitsToSample;
- long records = 0;
- long kept = 0;
- for (int i = 0; i < splitsToSample; ++i) {
- RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
- job, Reporter.NULL);
- K key = reader.createKey();
- V value = reader.createValue();
- while (reader.next(key, value)) {
- ++records;
- if ((double) kept / records < freq) {
- ++kept;
- samples.add(key);
- key = reader.createKey();
- }
- }
- reader.close();
- }
- return (K[])samples.toArray();
- }
- }
-
- /**
- * Write a partition file for the given job, using the Sampler provided.
- * Queries the sampler for a sample keyset, sorts by the output key
- * comparator, selects the keys for each rank, and writes to the destination
- * returned from {@link
- org.apache.hadoop.mapred.lib.TotalOrderPartitioner#getPartitionFile}.
- */
- @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
- public static <K,V> void writePartitionFile(JobConf job,
- Sampler<K,V> sampler) throws IOException {
- final InputFormat<K,V> inf = (InputFormat<K,V>) job.getInputFormat();
- int numPartitions = job.getNumReduceTasks();
- K[] samples = sampler.getSample(inf, job);
- LOG.info("Using " + samples.length + " samples");
- RawComparator<K> comparator =
- (RawComparator<K>) job.getOutputKeyComparator();
- Arrays.sort(samples, comparator);
- Path dst = new Path(TotalOrderPartitioner.getPartitionFile(job));
- FileSystem fs = dst.getFileSystem(job);
- if (fs.exists(dst)) {
- fs.delete(dst, false);
- }
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, dst,
- job.getMapOutputKeyClass(), NullWritable.class);
- NullWritable nullValue = NullWritable.get();
- float stepSize = samples.length / (float) numPartitions;
- int last = -1;
- for(int i = 1; i < numPartitions; ++i) {
- int k = Math.round(stepSize * i);
- while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
- ++k;
- }
- writer.append(samples[k], nullValue);
- last = k;
- }
- writer.close();
- }
-
- /**
- * Driver for InputSampler from the command line.
- * Configures a JobConf instance and calls {@link #writePartitionFile}.
- */
- public int run(String[] args) throws Exception {
- JobConf job = (JobConf) getConf();
- ArrayList<String> otherArgs = new ArrayList<String>();
- Sampler<K,V> sampler = null;
- for(int i=0; i < args.length; ++i) {
- try {
- if ("-r".equals(args[i])) {
- job.setNumReduceTasks(Integer.parseInt(args[++i]));
- } else if ("-inFormat".equals(args[i])) {
- job.setInputFormat(
- Class.forName(args[++i]).asSubclass(InputFormat.class));
- } else if ("-keyClass".equals(args[i])) {
- job.setMapOutputKeyClass(
- Class.forName(args[++i]).asSubclass(WritableComparable.class));
- } else if ("-splitSample".equals(args[i])) {
- int numSamples = Integer.parseInt(args[++i]);
- int maxSplits = Integer.parseInt(args[++i]);
- if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
- sampler = new SplitSampler<K,V>(numSamples, maxSplits);
- } else if ("-splitRandom".equals(args[i])) {
- double pcnt = Double.parseDouble(args[++i]);
- int numSamples = Integer.parseInt(args[++i]);
- int maxSplits = Integer.parseInt(args[++i]);
- if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
- sampler = new RandomSampler<K,V>(pcnt, numSamples, maxSplits);
- } else if ("-splitInterval".equals(args[i])) {
- double pcnt = Double.parseDouble(args[++i]);
- int maxSplits = Integer.parseInt(args[++i]);
- if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
- sampler = new IntervalSampler<K,V>(pcnt, maxSplits);
- } else {
- otherArgs.add(args[i]);
- }
- } catch (NumberFormatException except) {
- System.out.println("ERROR: Integer expected instead of " + args[i]);
- return printUsage();
- } catch (ArrayIndexOutOfBoundsException except) {
- System.out.println("ERROR: Required parameter missing from " +
- args[i-1]);
- return printUsage();
- }
- }
- if (job.getNumReduceTasks() <= 1) {
- System.err.println("Sampler requires more than one reducer");
- return printUsage();
- }
- if (otherArgs.size() < 2) {
- System.out.println("ERROR: Wrong number of parameters: ");
- return printUsage();
- }
- if (null == sampler) {
- sampler = new RandomSampler<K,V>(0.1, 10000, 10);
- }
-
- Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
- TotalOrderPartitioner.setPartitionFile(job, outf);
- for (String s : otherArgs) {
- FileInputFormat.addInputPath(job, new Path(s));
- }
- InputSampler.<K,V>writePartitionFile(job, sampler);
-
- return 0;
+ super(conf);
}
- public static void main(String[] args) throws Exception {
- JobConf job = new JobConf(InputSampler.class);
- InputSampler<?,?> sampler = new InputSampler(job);
- int res = ToolRunner.run(sampler, args);
- System.exit(res);
+ public static <K,V> void writePartitionFile(JobConf job, Sampler<K,V> sampler)
+ throws IOException, ClassNotFoundException, InterruptedException {
+ writePartitionFile(new Job(job), sampler);
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java?rev=771171&r1=771170&r2=771171&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java Mon May 4 04:18:22 2009
@@ -18,367 +18,26 @@
package org.apache.hadoop.mapred.lib;
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.Arrays;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
-import org.apache.hadoop.util.ReflectionUtils;
/**
* Partitioner effecting a total order by reading split points from
* an externally generated source.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner}
*/
-public class TotalOrderPartitioner<K extends WritableComparable,V>
+@Deprecated
+public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
+ extends org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner<K, V>
implements Partitioner<K,V> {
- private Node partitions;
- public static final String DEFAULT_PATH = "_partition.lst";
-
public TotalOrderPartitioner() { }
- /**
- * Read in the partition file and build indexing data structures.
- * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
- * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
- * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
- * will be built. Otherwise, keys will be located using a binary search of
- * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
- * defined for this job. The input file must be sorted with the same
- * comparator and contain {@link
- org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
- */
- @SuppressWarnings("unchecked") // keytype from conf not static
public void configure(JobConf job) {
- try {
- String parts = getPartitionFile(job);
- final Path partFile = new Path(parts);
- final FileSystem fs = (DEFAULT_PATH.equals(parts))
- ? FileSystem.getLocal(job) // assume in DistributedCache
- : partFile.getFileSystem(job);
-
- Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
- K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
- if (splitPoints.length != job.getNumReduceTasks() - 1) {
- throw new IOException("Wrong number of partitions in keyset");
- }
- RawComparator<K> comparator =
- (RawComparator<K>) job.getOutputKeyComparator();
- for (int i = 0; i < splitPoints.length - 1; ++i) {
- if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
- throw new IOException("Split points are out of order");
- }
- }
- boolean natOrder =
- job.getBoolean("total.order.partitioner.natural.order", true);
- if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
- partitions = buildTrie((BinaryComparable[])splitPoints, 0,
- splitPoints.length, new byte[0],
- // Now that blocks of identical splitless trie nodes are
- // represented reentrantly, and we develop a leaf for any trie
- // node with only one split point, the only reason for a depth
- // limit is to refute stack overflow or bloat in the pathological
- // case where the split points are long and mostly look like bytes
- // iii...iixii...iii . Therefore, we make the default depth
- // limit large but not huge.
- job.getInt("total.order.partitioner.max.trie.depth", 200));
- } else {
- partitions = new BinarySearchNode(splitPoints, comparator);
- }
- } catch (IOException e) {
- throw new IllegalArgumentException("Can't read partitions file", e);
- }
- }
-
- // by construction, we know if our keytype
- @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
- public int getPartition(K key, V value, int numPartitions) {
- return partitions.findPartition(key);
- }
-
- /**
- * Set the path to the SequenceFile storing the sorted partition keyset.
- * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
- * keys in the SequenceFile.
- */
- public static void setPartitionFile(JobConf job, Path p) {
- job.set("total.order.partitioner.path", p.toString());
- }
-
- /**
- * Get the path to the SequenceFile storing the sorted partition keyset.
- * @see #setPartitionFile(JobConf,Path)
- */
- public static String getPartitionFile(JobConf job) {
- return job.get("total.order.partitioner.path", DEFAULT_PATH);
- }
-
- /**
- * Interface to the partitioner to locate a key in the partition keyset.
- */
- interface Node<T> {
- /**
- * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
- * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
- */
- int findPartition(T key);
- }
-
- /**
- * Base class for trie nodes. If the keytype is memcomp-able, this builds
- * tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
- * bytes.
- */
- static abstract class TrieNode implements Node<BinaryComparable> {
- private final int level;
- TrieNode(int level) {
- this.level = level;
- }
- int getLevel() {
- return level;
- }
- }
-
- /**
- * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
- * where disabled by <tt>total.order.partitioner.natural.order</tt>,
- * search the partition keyset with a binary search.
- */
- class BinarySearchNode implements Node<K> {
- private final K[] splitPoints;
- private final RawComparator<K> comparator;
- BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
- this.splitPoints = splitPoints;
- this.comparator = comparator;
- }
- public int findPartition(K key) {
- final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
- return (pos < 0) ? -pos : pos;
- }
+ super.setConf(job);
}
- /**
- * An inner trie node that contains 256 children based on the next
- * character.
- */
- class InnerTrieNode extends TrieNode {
- private TrieNode[] child = new TrieNode[256];
-
- InnerTrieNode(int level) {
- super(level);
- }
- public int findPartition(BinaryComparable key) {
- int level = getLevel();
- if (key.getLength() <= level) {
- return child[0].findPartition(key);
- }
- return child[0xFF & key.getBytes()[level]].findPartition(key);
- }
- }
-
- /**
- * @param level the tree depth at this node
- * @param splitPoints the full split point vector, which holds
- * the split point or points this leaf node
- * should contain
- * @param lower first INcluded element of splitPoints
- * @param upper first EXcluded element of splitPoints
- * @return a leaf node. They come in three kinds: no split points
- * [and the findParttion returns a canned index], one split
- * point [and we compare with a single comparand], or more
- * than one [and we do a binary search]. The last case is
- * rare.
- */
- private TrieNode LeafTrieNodeFactory
- (int level, BinaryComparable[] splitPoints, int lower, int upper) {
- switch (upper - lower) {
- case 0:
- return new UnsplitTrieNode(level, lower);
-
- case 1:
- return new SinglySplitTrieNode(level, splitPoints, lower);
-
- default:
- return new LeafTrieNode(level, splitPoints, lower, upper);
- }
- }
-
- /**
- * A leaf trie node that scans for the key between lower..upper.
- *
- * We don't generate many of these now, since we usually continue trie-ing
- * when more than one split point remains at this level. and we make different
- * objects for nodes with 0 or 1 split point.
- */
- private class LeafTrieNode extends TrieNode {
- final int lower;
- final int upper;
- final BinaryComparable[] splitPoints;
- LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
- super(level);
- this.lower = lower;
- this.upper = upper;
- this.splitPoints = splitPoints;
- }
- public int findPartition(BinaryComparable key) {
- final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
- return (pos < 0) ? -pos : pos;
- }
- }
-
- private class UnsplitTrieNode extends TrieNode {
- final int result;
-
- UnsplitTrieNode(int level, int value) {
- super(level);
- this.result = value;
- }
-
- public int findPartition(BinaryComparable key) {
- return result;
- }
- }
-
- private class SinglySplitTrieNode extends TrieNode {
- final int lower;
- final BinaryComparable mySplitPoint;
-
- SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
- super(level);
- this.lower = lower;
- this.mySplitPoint = splitPoints[lower];
- }
-
- public int findPartition(BinaryComparable key) {
- return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
- }
- }
-
-
- /**
- * Read the cut points from the given IFile.
- * @param fs The file system
- * @param p The path to read
- * @param keyClass The map output key class
- * @param job The job config
- * @throws IOException
- */
- // matching key types enforced by passing in
- @SuppressWarnings("unchecked") // map output key class
- private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
- JobConf job) throws IOException {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
- ArrayList<K> parts = new ArrayList<K>();
- K key = (K) ReflectionUtils.newInstance(keyClass, job);
- NullWritable value = NullWritable.get();
- while (reader.next(key, value)) {
- parts.add(key);
- key = (K) ReflectionUtils.newInstance(keyClass, job);
- }
- reader.close();
- return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
- }
-
- /**
- *
- * This object contains a TrieNodeRef if there is such a thing that
- * can be repeated. Two adjacent trie node slots that contain no
- * split points can be filled with the same trie node, even if they
- * are not on the same level. See buildTreeRec, below.
- *
- */
- private class CarriedTrieNodeRef
- {
- TrieNode content;
-
- CarriedTrieNodeRef() {
- content = null;
- }
- }
-
-
- /**
- * Given a sorted set of cut points, build a trie that will find the correct
- * partition quickly.
- * @param splits the list of cut points
- * @param lower the lower bound of partitions 0..numPartitions-1
- * @param upper the upper bound of partitions 0..numPartitions-1
- * @param prefix the prefix that we have already checked against
- * @param maxDepth the maximum depth we will build a trie for
- * @return the trie node that will divide the splits correctly
- */
- private TrieNode buildTrie(BinaryComparable[] splits, int lower,
- int upper, byte[] prefix, int maxDepth) {
- return buildTrieRec
- (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
- }
-
- /**
- * This is the core of buildTrie. The interface, and stub, above, just adds
- * an empty CarriedTrieNodeRef.
- *
- * We build trie nodes in depth first order, which is also in key space
- * order. Every leaf node is referenced as a slot in a parent internal
- * node. If two adjacent slots [in the DFO] hold leaf nodes that have
- * no split point, then they are not separated by a split point either,
- * because there's no place in key space for that split point to exist.
- *
- * When that happens, the leaf nodes would be semantically identical, and
- * we reuse the object. A single CarriedTrieNodeRef "ref" lives for the
- * duration of the tree-walk. ref carries a potentially reusable, unsplit
- * leaf node for such reuse until a leaf node with a split arises, which
- * breaks the chain until we need to make a new unsplit leaf node.
- *
- * Note that this use of CarriedTrieNodeRef means that for internal nodes,
- * for internal nodes if this code is modified in any way we still need
- * to make or fill in the subnodes in key space order.
- */
- private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
- int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
- final int depth = prefix.length;
- // We generate leaves for a single split point as well as for
- // no split points.
- if (depth >= maxDepth || lower >= upper - 1) {
- // If we have two consecutive requests for an unsplit trie node, we
- // can deliver the same one the second time.
- if (lower == upper && ref.content != null) {
- return ref.content;
- }
- TrieNode result = LeafTrieNodeFactory(depth, splits, lower, upper);
- ref.content = lower == upper ? result : null;
- return result;
- }
- InnerTrieNode result = new InnerTrieNode(depth);
- byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
- // append an extra byte on to the prefix
- int currentBound = lower;
- for(int ch = 0; ch < 0xFF; ++ch) {
- trial[depth] = (byte) (ch + 1);
- lower = currentBound;
- while (currentBound < upper) {
- if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
- break;
- }
- currentBound += 1;
- }
- trial[depth] = (byte) ch;
- result.child[0xFF & ch]
- = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
- }
- // pick up the rest
- trial[depth] = (byte)0xFF;
- result.child[0xFF]
- = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
-
- return result;
- }
}
Copied: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java (from r771170, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java)
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java?p2=hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java&p1=hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java&r1=771170&r2=771171&rev=771171&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/InputSampler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java Mon May 4 04:18:22 2009
@@ -16,82 +16,74 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.lib;
+package org.apache.hadoop.mapreduce.lib.partition;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Utility for collecting samples and writing a partition file for
- * {@link org.apache.hadoop.mapred.lib.TotalOrderPartitioner}.
+ * {@link TotalOrderPartitioner}.
*/
-public class InputSampler<K,V> implements Tool {
+public class InputSampler<K,V> extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(InputSampler.class);
static int printUsage() {
System.out.println("sampler -r <reduces>\n" +
- " [-inFormat <input format class>]\n" +
- " [-keyClass <map input & output key class>]\n" +
- " [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
- "// Sample from random splits at random (general)\n" +
- " -splitSample <numSamples> <maxsplits> | " +
- " // Sample from first records in splits (random data)\n"+
- " -splitInterval <double pcnt> <maxsplits>]" +
- " // Sample from splits at intervals (sorted data)");
+ " [-inFormat <input format class>]\n" +
+ " [-keyClass <map input & output key class>]\n" +
+ " [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
+ "// Sample from random splits at random (general)\n" +
+ " -splitSample <numSamples> <maxsplits> | " +
+ " // Sample from first records in splits (random data)\n"+
+ " -splitInterval <double pcnt> <maxsplits>]" +
+ " // Sample from splits at intervals (sorted data)");
System.out.println("Default sampler: -splitRandom 0.1 10000 10");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
- private JobConf conf;
-
- public InputSampler(JobConf conf) {
- this.conf = conf;
- }
-
- public Configuration getConf() {
- return conf;
- }
-
- public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- this.conf = new JobConf(conf);
- } else {
- this.conf = (JobConf) conf;
- }
+ public InputSampler(Configuration conf) {
+ setConf(conf);
}
/**
- * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
+ * Interface to sample using an
+ * {@link org.apache.hadoop.mapreduce.InputFormat}.
*/
public interface Sampler<K,V> {
/**
* For a given job, collect and return a subset of the keys from the
* input data.
*/
- K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
+ K[] getSample(InputFormat<K,V> inf, Job job)
+ throws IOException, InterruptedException;
}
/**
@@ -128,21 +120,20 @@
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
- public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
- InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+ public K[] getSample(InputFormat<K,V> inf, Job job)
+ throws IOException, InterruptedException {
+ List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(numSamples);
- int splitsToSample = Math.min(maxSplitsSampled, splits.length);
- int splitStep = splits.length / splitsToSample;
+ int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+ int splitStep = splits.size() / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
- RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
- job, Reporter.NULL);
- K key = reader.createKey();
- V value = reader.createValue();
- while (reader.next(key, value)) {
- samples.add(key);
- key = reader.createKey();
+ RecordReader<K,V> reader = inf.createRecordReader(
+ splits.get(i * splitStep),
+ new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
+ while (reader.nextKeyValue()) {
+ samples.add(reader.getCurrentKey());
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
@@ -195,35 +186,34 @@
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
- public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
- InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+ public K[] getSample(InputFormat<K,V> inf, Job job)
+ throws IOException, InterruptedException {
+ List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(numSamples);
- int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+ int splitsToSample = Math.min(maxSplitsSampled, splits.size());
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
- for (int i = 0; i < splits.length; ++i) {
- InputSplit tmp = splits[i];
- int j = r.nextInt(splits.length);
- splits[i] = splits[j];
- splits[j] = tmp;
+ for (int i = 0; i < splits.size(); ++i) {
+ InputSplit tmp = splits.get(i);
+ int j = r.nextInt(splits.size());
+ splits.set(i, splits.get(j));
+ splits.set(j, tmp);
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i < splitsToSample ||
- (i < splits.length && samples.size() < numSamples); ++i) {
- RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
- Reporter.NULL);
- K key = reader.createKey();
- V value = reader.createValue();
- while (reader.next(key, value)) {
+ (i < splits.size() && samples.size() < numSamples); ++i) {
+ RecordReader<K,V> reader = inf.createRecordReader(splits.get(i),
+ new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
+ while (reader.nextKeyValue()) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
- samples.add(key);
+ samples.add(reader.getCurrentKey());
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
@@ -231,11 +221,10 @@
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
- samples.set(ind, key);
+ samples.set(ind, reader.getCurrentKey());
}
freq *= (numSamples - 1) / (double) numSamples;
}
- key = reader.createKey();
}
}
reader.close();
@@ -277,24 +266,23 @@
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
- public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
- InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+ public K[] getSample(InputFormat<K,V> inf, Job job)
+ throws IOException, InterruptedException {
+ List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>();
- int splitsToSample = Math.min(maxSplitsSampled, splits.length);
- int splitStep = splits.length / splitsToSample;
+ int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+ int splitStep = splits.size() / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
- RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
- job, Reporter.NULL);
- K key = reader.createKey();
- V value = reader.createValue();
- while (reader.next(key, value)) {
+ RecordReader<K,V> reader = inf.createRecordReader(
+ splits.get(i * splitStep),
+ new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
+ while (reader.nextKeyValue()) {
++records;
if ((double) kept / records < freq) {
++kept;
- samples.add(key);
- key = reader.createKey();
+ samples.add(reader.getCurrentKey());
}
}
reader.close();
@@ -307,26 +295,27 @@
* Write a partition file for the given job, using the Sampler provided.
* Queries the sampler for a sample keyset, sorts by the output key
* comparator, selects the keys for each rank, and writes to the destination
- * returned from {@link
- org.apache.hadoop.mapred.lib.TotalOrderPartitioner#getPartitionFile}.
+ * returned from {@link TotalOrderPartitioner#getPartitionFile}.
*/
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
- public static <K,V> void writePartitionFile(JobConf job,
- Sampler<K,V> sampler) throws IOException {
- final InputFormat<K,V> inf = (InputFormat<K,V>) job.getInputFormat();
+ public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler)
+ throws IOException, ClassNotFoundException, InterruptedException {
+ Configuration conf = job.getConfiguration();
+ final InputFormat inf =
+ ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
int numPartitions = job.getNumReduceTasks();
K[] samples = sampler.getSample(inf, job);
LOG.info("Using " + samples.length + " samples");
RawComparator<K> comparator =
- (RawComparator<K>) job.getOutputKeyComparator();
+ (RawComparator<K>) job.getSortComparator();
Arrays.sort(samples, comparator);
- Path dst = new Path(TotalOrderPartitioner.getPartitionFile(job));
- FileSystem fs = dst.getFileSystem(job);
+ Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
+ FileSystem fs = dst.getFileSystem(conf);
if (fs.exists(dst)) {
fs.delete(dst, false);
}
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, dst,
- job.getMapOutputKeyClass(), NullWritable.class);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs,
+ conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
NullWritable nullValue = NullWritable.get();
float stepSize = samples.length / (float) numPartitions;
int last = -1;
@@ -346,7 +335,7 @@
* Configures a JobConf instance and calls {@link #writePartitionFile}.
*/
public int run(String[] args) throws Exception {
- JobConf job = (JobConf) getConf();
+ Job job = new Job(getConf());
ArrayList<String> otherArgs = new ArrayList<String>();
Sampler<K,V> sampler = null;
for(int i=0; i < args.length; ++i) {
@@ -354,7 +343,7 @@
if ("-r".equals(args[i])) {
job.setNumReduceTasks(Integer.parseInt(args[++i]));
} else if ("-inFormat".equals(args[i])) {
- job.setInputFormat(
+ job.setInputFormatClass(
Class.forName(args[++i]).asSubclass(InputFormat.class));
} else if ("-keyClass".equals(args[i])) {
job.setMapOutputKeyClass(
@@ -400,7 +389,7 @@
}
Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
- TotalOrderPartitioner.setPartitionFile(job, outf);
+ TotalOrderPartitioner.setPartitionFile(getConf(), outf);
for (String s : otherArgs) {
FileInputFormat.addInputPath(job, new Path(s));
}
@@ -410,8 +399,7 @@
}
public static void main(String[] args) throws Exception {
- JobConf job = new JobConf(InputSampler.class);
- InputSampler<?,?> sampler = new InputSampler(job);
+ InputSampler<?,?> sampler = new InputSampler(new Configuration());
int res = ToolRunner.run(sampler, args);
System.exit(res);
}
Copied: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java (from r771170, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java)
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java?p2=hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java&p1=hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java&r1=771170&r2=771171&rev=771171&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java Mon May 4 04:18:22 2009
@@ -16,13 +16,15 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.lib;
+package org.apache.hadoop.mapreduce.lib.partition;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BinaryComparable;
@@ -30,19 +32,20 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Partitioner effecting a total order by reading split points from
* an externally generated source.
*/
-public class TotalOrderPartitioner<K extends WritableComparable,V>
- implements Partitioner<K,V> {
+public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
+ extends Partitioner<K,V> implements Configurable {
private Node partitions;
public static final String DEFAULT_PATH = "_partition.lst";
+ Configuration conf;
public TotalOrderPartitioner() { }
@@ -54,32 +57,33 @@
* will be built. Otherwise, keys will be located using a binary search of
* the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
* defined for this job. The input file must be sorted with the same
- * comparator and contain {@link
- org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
+ * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
*/
@SuppressWarnings("unchecked") // keytype from conf not static
- public void configure(JobConf job) {
+ public void setConf(Configuration conf) {
try {
- String parts = getPartitionFile(job);
+ this.conf = conf;
+ String parts = getPartitionFile(conf);
final Path partFile = new Path(parts);
final FileSystem fs = (DEFAULT_PATH.equals(parts))
- ? FileSystem.getLocal(job) // assume in DistributedCache
- : partFile.getFileSystem(job);
+ ? FileSystem.getLocal(conf) // assume in DistributedCache
+ : partFile.getFileSystem(conf);
+ Job job = new Job(conf);
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
- K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
+ K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
if (splitPoints.length != job.getNumReduceTasks() - 1) {
throw new IOException("Wrong number of partitions in keyset");
}
RawComparator<K> comparator =
- (RawComparator<K>) job.getOutputKeyComparator();
+ (RawComparator<K>) job.getSortComparator();
for (int i = 0; i < splitPoints.length - 1; ++i) {
if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
throw new IOException("Split points are out of order");
}
}
boolean natOrder =
- job.getBoolean("total.order.partitioner.natural.order", true);
+ conf.getBoolean("total.order.partitioner.natural.order", true);
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
@@ -90,7 +94,7 @@
// case where the split points are long and mostly look like bytes
// iii...iixii...iii . Therefore, we make the default depth
// limit large but not huge.
- job.getInt("total.order.partitioner.max.trie.depth", 200));
+ conf.getInt("total.order.partitioner.max.trie.depth", 200));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}
@@ -99,7 +103,11 @@
}
}
- // by construction, we know if our keytype
+ public Configuration getConf() {
+ return conf;
+ }
+
+ // by construction, we know if our keytype
@SuppressWarnings("unchecked") // is memcmp-able and uses the trie
public int getPartition(K key, V value, int numPartitions) {
return partitions.findPartition(key);
@@ -110,16 +118,16 @@
* It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
* keys in the SequenceFile.
*/
- public static void setPartitionFile(JobConf job, Path p) {
- job.set("total.order.partitioner.path", p.toString());
+ public static void setPartitionFile(Configuration conf, Path p) {
+ conf.set("total.order.partitioner.path", p.toString());
}
/**
* Get the path to the SequenceFile storing the sorted partition keyset.
- * @see #setPartitionFile(JobConf,Path)
+ * @see #setPartitionFile(Configuration, Path)
*/
- public static String getPartitionFile(JobConf job) {
- return job.get("total.order.partitioner.path", DEFAULT_PATH);
+ public static String getPartitionFile(Configuration conf) {
+ return conf.get("total.order.partitioner.path", DEFAULT_PATH);
}
/**
@@ -275,14 +283,14 @@
// matching key types enforced by passing in
@SuppressWarnings("unchecked") // map output key class
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
- JobConf job) throws IOException {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
+ Configuration conf) throws IOException {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
ArrayList<K> parts = new ArrayList<K>();
- K key = (K) ReflectionUtils.newInstance(keyClass, job);
+ K key = (K) ReflectionUtils.newInstance(keyClass, conf);
NullWritable value = NullWritable.get();
while (reader.next(key, value)) {
parts.add(key);
- key = (K) ReflectionUtils.newInstance(keyClass, job);
+ key = (K) ReflectionUtils.newInstance(keyClass, conf);
}
reader.close();
return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
Copied: hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java (from r771170, hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java)
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java?p2=hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java&p1=hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java&r1=771170&r2=771171&rev=771171&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java Mon May 4 04:18:22 2009
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred.lib;
+package org.apache.hadoop.mapreduce.lib.partition;
import java.io.IOException;
import java.util.ArrayList;
@@ -24,6 +24,7 @@
import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
@@ -33,7 +34,6 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.JobConf;
public class TestTotalOrderPartitioner extends TestCase {
@@ -75,17 +75,16 @@
testStrings.add(new Check<Text>(new Text("hi"), 6));
};
- private static <T extends WritableComparable> Path writePartitionFile(
- String testname, JobConf conf, T[] splits) throws IOException {
+ private static <T extends WritableComparable<?>> Path writePartitionFile(
+ String testname, Configuration conf, T[] splits) throws IOException {
final FileSystem fs = FileSystem.getLocal(conf);
final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
).makeQualified(fs);
Path p = new Path(testdir, testname + "/_partition.lst");
TotalOrderPartitioner.setPartitionFile(conf, p);
- conf.setNumReduceTasks(splits.length + 1);
+ conf.setInt("mapred.reduce.tasks", splits.length + 1);
SequenceFile.Writer w = null;
try {
- NullWritable nw = NullWritable.get();
w = SequenceFile.createWriter(fs, conf, p,
splits[0].getClass(), NullWritable.class,
SequenceFile.CompressionType.NONE);
@@ -102,39 +101,39 @@
public void testTotalOrderMemCmp() throws Exception {
TotalOrderPartitioner<Text,NullWritable> partitioner =
new TotalOrderPartitioner<Text,NullWritable>();
- JobConf job = new JobConf();
+ Configuration conf = new Configuration();
Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
- "totalordermemcmp", job, splitStrings);
- job.setMapOutputKeyClass(Text.class);
+ "totalordermemcmp", conf, splitStrings);
+ conf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
try {
- partitioner.configure(job);
+ partitioner.setConf(conf);
NullWritable nw = NullWritable.get();
for (Check<Text> chk : testStrings) {
assertEquals(chk.data.toString(), chk.part,
partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
}
} finally {
- p.getFileSystem(job).delete(p, true);
+ p.getFileSystem(conf).delete(p, true);
}
}
public void testTotalOrderBinarySearch() throws Exception {
TotalOrderPartitioner<Text,NullWritable> partitioner =
new TotalOrderPartitioner<Text,NullWritable>();
- JobConf job = new JobConf();
+ Configuration conf = new Configuration();
Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
- "totalorderbinarysearch", job, splitStrings);
- job.setBoolean("total.order.partitioner.natural.order", false);
- job.setMapOutputKeyClass(Text.class);
+ "totalorderbinarysearch", conf, splitStrings);
+ conf.setBoolean("total.order.partitioner.natural.order", false);
+ conf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
try {
- partitioner.configure(job);
+ partitioner.setConf(conf);
NullWritable nw = NullWritable.get();
for (Check<Text> chk : testStrings) {
assertEquals(chk.data.toString(), chk.part,
partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
}
} finally {
- p.getFileSystem(job).delete(p, true);
+ p.getFileSystem(conf).delete(p, true);
}
}
@@ -153,14 +152,15 @@
public void testTotalOrderCustomComparator() throws Exception {
TotalOrderPartitioner<Text,NullWritable> partitioner =
new TotalOrderPartitioner<Text,NullWritable>();
- JobConf job = new JobConf();
+ Configuration conf = new Configuration();
Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
Arrays.sort(revSplitStrings, new ReverseStringComparator());
Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
- "totalordercustomcomparator", job, revSplitStrings);
- job.setBoolean("total.order.partitioner.natural.order", false);
- job.setMapOutputKeyClass(Text.class);
- job.setOutputKeyComparatorClass(ReverseStringComparator.class);
+ "totalordercustomcomparator", conf, revSplitStrings);
+ conf.setBoolean("total.order.partitioner.natural.order", false);
+ conf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
+ conf.setClass("mapred.output.key.comparator.class",
+ ReverseStringComparator.class, RawComparator.class);
ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>();
revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
revCheck.add(new Check<Text>(new Text("aaabb"), 9));
@@ -174,15 +174,14 @@
revCheck.add(new Check<Text>(new Text("ddngo"), 4));
revCheck.add(new Check<Text>(new Text("hi"), 3));
try {
- partitioner.configure(job);
+ partitioner.setConf(conf);
NullWritable nw = NullWritable.get();
for (Check<Text> chk : revCheck) {
assertEquals(chk.data.toString(), chk.part,
partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
}
} finally {
- p.getFileSystem(job).delete(p, true);
+ p.getFileSystem(conf).delete(p, true);
}
}
-
}