You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/09/20 01:39:10 UTC
svn commit: r697283 - in /hadoop/core/trunk: ./ bin/
src/examples/org/apache/hadoop/examples/
src/mapred/org/apache/hadoop/mapred/lib/
src/test/org/apache/hadoop/mapred/lib/ src/tools/org/apache/hadoop/tools/
Author: omalley
Date: Fri Sep 19 16:39:09 2008
New Revision: 697283
URL: http://svn.apache.org/viewvc?rev=697283&view=rev
Log:
HADOOP-3019. A new library to support total order partitions.
(cdouglas via omalley)
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java
hadoop/core/trunk/src/tools/org/apache/hadoop/tools/InputSampler.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/bin/hadoop
hadoop/core/trunk/build.xml
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=697283&r1=697282&r2=697283&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 16:39:09 2008
@@ -199,6 +199,9 @@
allow Map-Reduce applications to work with databases. (Fredrik Hedberg and
Enis Soztutar via acmurthy)
+ HADOOP-3019. A new library to support total order partitions.
+ (cdouglas via omalley)
+
IMPROVEMENTS
HADOOP-4106. libhdfs: add time, permission and user attribute support (part 2).
Modified: hadoop/core/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/bin/hadoop?rev=697283&r1=697282&r2=697283&view=diff
==============================================================================
--- hadoop/core/trunk/bin/hadoop (original)
+++ hadoop/core/trunk/bin/hadoop Fri Sep 19 16:39:09 2008
@@ -122,6 +122,9 @@
if [ -d "$HADOOP_HOME/build/test/classes" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build/test/classes
fi
+if [ -d "$HADOOP_HOME/build/tools" ]; then
+ CLASSPATH=${CLASSPATH}:$HADOOP_HOME/build/tools
+fi
# so that filenames w/ spaces are handled correctly in loops below
IFS=
@@ -220,6 +223,10 @@
CLASS=org.apache.hadoop.tools.HadoopArchives
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
+elif [ "$COMMAND" = "sampler" ] ; then
+ CLASS=org.apache.hadoop.tools.InputSampler
+ CLASSPATH=${CLASSPATH}:${TOOL_PATH}
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
else
CLASS=$COMMAND
fi
Modified: hadoop/core/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/build.xml?rev=697283&r1=697282&r2=697283&view=diff
==============================================================================
--- hadoop/core/trunk/build.xml (original)
+++ hadoop/core/trunk/build.xml Fri Sep 19 16:39:09 2008
@@ -422,7 +422,7 @@
</target>
<target name="compile-examples"
- depends="compile-core,compile-c++-examples">
+ depends="compile-core,compile-tools,compile-c++-examples">
<javac
encoding="${build.encoding}"
srcdir="${examples.dir}"
@@ -434,7 +434,10 @@
source="${javac.version}"
deprecation="${javac.deprecation}">
<compilerarg line="${javac.args} ${javac.args.warnings}" />
- <classpath refid="classpath"/>
+ <classpath>
+ <path refid="classpath"/>
+ <pathelement location="${build.tools}"/>
+ </classpath>
</javac>
</target>
@@ -852,6 +855,7 @@
<include name="*/lib/*.jar" />
</fileset>
<pathelement path="${java.class.path}"/>
+ <pathelement location="${build.tools}"/>
</classpath>
<group title="Core" packages="org.apache.*"/>
Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java?rev=697283&r1=697282&r2=697283&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java Fri Sep 19 16:39:09 2008
@@ -19,10 +19,12 @@
package org.apache.hadoop.examples;
import java.io.IOException;
+import java.net.URI;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
@@ -30,6 +32,8 @@
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+import org.apache.hadoop.tools.InputSampler;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -43,9 +47,10 @@
* [-outFormat <i>output format class</i>]
* [-outKey <i>output key class</i>]
* [-outValue <i>output value class</i>]
+ * [-totalOrder <i>pcnt</i> <i>num samples</i> <i>max splits</i>]
* <i>in-dir</i> <i>out-dir</i>
*/
-public class Sort extends Configured implements Tool {
+public class Sort<K,V> extends Configured implements Tool {
static int printUsage() {
System.out.println("sort [-m <maps>] [-r <reduces>] " +
@@ -53,6 +58,7 @@
"[-outFormat <output format class>] " +
"[-outKey <output key class>] " +
"[-outValue <output value class>] " +
+ "[-totalOrder <pcnt> <num samples> <max splits>] " +
"<input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
@@ -87,6 +93,7 @@
Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
Class<? extends Writable> outputValueClass = BytesWritable.class;
List<String> otherArgs = new ArrayList<String>();
+ InputSampler.Sampler<K,V> sampler = null;
for(int i=0; i < args.length; ++i) {
try {
if ("-m".equals(args[i])) {
@@ -105,6 +112,13 @@
} else if ("-outValue".equals(args[i])) {
outputValueClass =
Class.forName(args[++i]).asSubclass(Writable.class);
+ } else if ("-totalOrder".equals(args[i])) {
+ double pcnt = Double.parseDouble(args[++i]);
+ int numSamples = Integer.parseInt(args[++i]);
+ int maxSplits = Integer.parseInt(args[++i]);
+ if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+ sampler =
+ new InputSampler.RandomSampler<K,V>(pcnt, numSamples, maxSplits);
} else {
otherArgs.add(args[i]);
}
@@ -136,6 +150,20 @@
FileInputFormat.setInputPaths(jobConf, otherArgs.get(0));
FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.get(1)));
+ if (sampler != null) {
+ System.out.println("Sampling input to effect total-order sort...");
+ jobConf.setPartitionerClass(TotalOrderPartitioner.class);
+ Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
+ inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
+ Path partitionFile = new Path(inputDir, "_sortPartitioning");
+ TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
+ InputSampler.<K,V>writePartitionFile(jobConf, sampler);
+ URI partitionUri = new URI(partitionFile.toString() +
+ "#" + "_sortPartitioning");
+ DistributedCache.addCacheFile(partitionUri, jobConf);
+ DistributedCache.createSymlink(jobConf);
+ }
+
System.out.println("Running on " +
cluster.getTaskTrackers() +
" nodes to sort from " +
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java?rev=697283&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java Fri Sep 19 16:39:09 2008
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Partitioner effecting a total order by reading split points from
+ * an externally generated source.
+ */
+public class TotalOrderPartitioner<K extends WritableComparable,V>
+ implements Partitioner<K,V> {
+
+ private Node partitions;
+ public static final String DEFAULT_PATH = "_partition.lst";
+
+ public TotalOrderPartitioner() { }
+
+ /**
+ * Read in the partition file and build indexing data structures.
+ * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
+ * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
+ * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
+ * will be built. Otherwise, keys will be located using a binary search of
+ * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
+ * defined for this job. The input file must be sorted with the same
+ * comparator and contain {@link
+ org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
+ */
+ @SuppressWarnings("unchecked") // keytype from conf not static
+ public void configure(JobConf job) {
+ try {
+ String parts = getPartitionFile(job);
+ final Path partFile = new Path(parts);
+ final FileSystem fs = (DEFAULT_PATH.equals(parts))
+ ? FileSystem.getLocal(job) // assume in DistributedCache
+ : partFile.getFileSystem(job);
+
+ Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+ K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
+ if (splitPoints.length != job.getNumReduceTasks() - 1) {
+ throw new IOException("Wrong number of partitions in keyset");
+ }
+ RawComparator<K> comparator =
+ (RawComparator<K>) job.getOutputKeyComparator();
+ for (int i = 0; i < splitPoints.length - 1; ++i) {
+ if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
+ throw new IOException("Split points are out of order");
+ }
+ }
+ boolean natOrder =
+ job.getBoolean("total.order.partitioner.natural.order", true);
+ if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
+ partitions = buildTrie((BinaryComparable[])splitPoints, 0,
+ splitPoints.length, new byte[0],
+ job.getInt("total.order.partitioner.max.trie.depth", 2));
+ } else {
+ partitions = new BinarySearchNode(splitPoints, comparator);
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Can't read partitions file", e);
+ }
+ }
+
+ // by construction, we know if our keytype
+ @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
+ public int getPartition(K key, V value, int numPartitions) {
+ return partitions.findPartition(key);
+ }
+
+ /**
+ * Set the path to the SequenceFile storing the sorted partition keyset.
+ * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
+ * keys in the SequenceFile.
+ */
+ public static void setPartitionFile(JobConf job, Path p) {
+ job.set("total.order.partitioner.path", p.toString());
+ }
+
+ /**
+ * Get the path to the SequenceFile storing the sorted partition keyset.
+ * @see #setPartitionFile(JobConf,Path)
+ */
+ public static String getPartitionFile(JobConf job) {
+ return job.get("total.order.partitioner.path", DEFAULT_PATH);
+ }
+
+ /**
+ * Interface to the partitioner to locate a key in the partition keyset.
+ */
+ interface Node<T> {
+ /**
+ * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
+ * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
+ */
+ int findPartition(T key);
+ }
+
+ /**
+ * Base class for trie nodes. If the keytype is memcomp-able, this builds
+ * tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
+ * bytes.
+ */
+ static abstract class TrieNode implements Node<BinaryComparable> {
+ private final int level;
+ TrieNode(int level) {
+ this.level = level;
+ }
+ int getLevel() {
+ return level;
+ }
+ }
+
+ /**
+ * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
+ * where disabled by <tt>total.order.partitioner.natural.order</tt>,
+ * search the partition keyset with a binary search.
+ */
+ class BinarySearchNode implements Node<K> {
+ private final K[] splitPoints;
+ private final RawComparator<K> comparator;
+ BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
+ this.splitPoints = splitPoints;
+ this.comparator = comparator;
+ }
+ public int findPartition(K key) {
+ final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
+ return (pos < 0) ? -pos : pos;
+ }
+ }
+
+ /**
+ * An inner trie node that contains 256 children based on the next
+ * character.
+ */
+ class InnerTrieNode extends TrieNode {
+ private TrieNode[] child = new TrieNode[256];
+
+ InnerTrieNode(int level) {
+ super(level);
+ }
+ public int findPartition(BinaryComparable key) {
+ int level = getLevel();
+ if (key.getLength() <= level) {
+ return child[0].findPartition(key);
+ }
+ return child[0xFF & key.getBytes()[level]].findPartition(key);
+ }
+ }
+
+ /**
+ * A leaf trie node that scans for the key between lower..upper.
+ */
+ class LeafTrieNode extends TrieNode {
+ final int lower;
+ final int upper;
+ final BinaryComparable[] splitPoints;
+ LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
+ super(level);
+ this.lower = lower;
+ this.upper = upper;
+ this.splitPoints = splitPoints;
+ }
+ public int findPartition(BinaryComparable key) {
+ final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
+ return (pos < 0) ? -pos : pos;
+ }
+ }
+
+
+ /**
+ * Read the cut points from the given IFile.
+ * @param fs The file system
+ * @param p The path to read
+ * @param keyClass The map output key class
+ * @param job The job config
+ * @throws IOException
+ */
+ // matching key types enforced by passing in
+ @SuppressWarnings("unchecked") // map output key class
+ private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
+ JobConf job) throws IOException {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
+ ArrayList<K> parts = new ArrayList<K>();
+ K key = (K) ReflectionUtils.newInstance(keyClass, job);
+ NullWritable value = NullWritable.get();
+ while (reader.next(key, value)) {
+ parts.add(key);
+ key = (K) ReflectionUtils.newInstance(keyClass, job);
+ }
+ reader.close();
+ return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
+ }
+
+ /**
+ * Given a sorted set of cut points, build a trie that will find the correct
+ * partition quickly.
+ * @param splits the list of cut points
+ * @param lower the lower bound of partitions 0..numPartitions-1
+ * @param upper the upper bound of partitions 0..numPartitions-1
+ * @param prefix the prefix that we have already checked against
+ * @param maxDepth the maximum depth we will build a trie for
+ * @return the trie node that will divide the splits correctly
+ */
+ private TrieNode buildTrie(BinaryComparable[] splits, int lower,
+ int upper, byte[] prefix, int maxDepth) {
+ final int depth = prefix.length;
+ if (depth >= maxDepth || lower == upper) {
+ return new LeafTrieNode(depth, splits, lower, upper);
+ }
+ InnerTrieNode result = new InnerTrieNode(depth);
+ byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
+ // append an extra byte on to the prefix
+ int currentBound = lower;
+ for(int ch = 0; ch < 255; ++ch) {
+ trial[depth] = (byte) (ch + 1);
+ lower = currentBound;
+ while (currentBound < upper) {
+ if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
+ break;
+ }
+ currentBound += 1;
+ }
+ trial[depth] = (byte) ch;
+ result.child[0xFF & ch] = buildTrie(splits, lower, currentBound, trial,
+ maxDepth);
+ }
+ // pick up the rest
+ trial[depth] = 127;
+ result.child[255] = buildTrie(splits, currentBound, upper, trial,
+ maxDepth);
+ return result;
+ }
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java?rev=697283&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestTotalOrderPartitioner.java Fri Sep 19 16:39:09 2008
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+import junit.extensions.TestSetup;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+
+public class TestTotalOrderPartitioner extends TestCase {
+
+ private static final Text[] splitStrings = new Text[] {
+ // -inf // 0
+ new Text("aabbb"), // 1
+ new Text("babbb"), // 2
+ new Text("daddd"), // 3
+ new Text("dddee"), // 4
+ new Text("ddhee"), // 5
+ new Text("dingo"), // 6
+ new Text("hijjj"), // 7
+ new Text("n"), // 8
+ new Text("yak"), // 9
+ };
+
+ static class Check<T> {
+ T data;
+ int part;
+ Check(T data, int part) {
+ this.data = data;
+ this.part = part;
+ }
+ }
+
+ private static final ArrayList<Check<Text>> testStrings =
+ new ArrayList<Check<Text>>();
+ static {
+ testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
+ testStrings.add(new Check<Text>(new Text("aaabb"), 0));
+ testStrings.add(new Check<Text>(new Text("aabbb"), 1));
+ testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
+ testStrings.add(new Check<Text>(new Text("babbb"), 2));
+ testStrings.add(new Check<Text>(new Text("baabb"), 1));
+ testStrings.add(new Check<Text>(new Text("yai"), 8));
+ testStrings.add(new Check<Text>(new Text("yak"), 9));
+ testStrings.add(new Check<Text>(new Text("z"), 9));
+ testStrings.add(new Check<Text>(new Text("ddngo"), 5));
+ testStrings.add(new Check<Text>(new Text("hi"), 6));
+ };
+
+ private static <T extends WritableComparable> Path writePartitionFile(
+ String testname, JobConf conf, T[] splits) throws IOException {
+ final FileSystem fs = FileSystem.getLocal(conf);
+ final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
+ ).makeQualified(fs);
+ Path p = new Path(testdir, testname + "/_partition.lst");
+ TotalOrderPartitioner.setPartitionFile(conf, p);
+ conf.setNumReduceTasks(splits.length + 1);
+ SequenceFile.Writer w = null;
+ try {
+ NullWritable nw = NullWritable.get();
+ w = SequenceFile.createWriter(fs, conf, p,
+ splits[0].getClass(), NullWritable.class,
+ SequenceFile.CompressionType.NONE);
+ for (int i = 0; i < splits.length; ++i) {
+ w.append(splits[i], NullWritable.get());
+ }
+ } finally {
+ if (null != w)
+ w.close();
+ }
+ return p;
+ }
+
+ public void testTotalOrderMemCmp() throws Exception {
+ TotalOrderPartitioner<Text,NullWritable> partitioner =
+ new TotalOrderPartitioner<Text,NullWritable>();
+ JobConf job = new JobConf();
+ Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+ "totalordermemcmp", job, splitStrings);
+ job.setMapOutputKeyClass(Text.class);
+ try {
+ partitioner.configure(job);
+ NullWritable nw = NullWritable.get();
+ for (Check<Text> chk : testStrings) {
+ assertEquals(chk.data.toString(), chk.part,
+ partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+ }
+ } finally {
+ p.getFileSystem(job).delete(p);
+ }
+ }
+
+ public void testTotalOrderBinarySearch() throws Exception {
+ TotalOrderPartitioner<Text,NullWritable> partitioner =
+ new TotalOrderPartitioner<Text,NullWritable>();
+ JobConf job = new JobConf();
+ Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+ "totalorderbinarysearch", job, splitStrings);
+ job.setBoolean("total.order.partitioner.natural.order", false);
+ job.setMapOutputKeyClass(Text.class);
+ try {
+ partitioner.configure(job);
+ NullWritable nw = NullWritable.get();
+ for (Check<Text> chk : testStrings) {
+ assertEquals(chk.data.toString(), chk.part,
+ partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+ }
+ } finally {
+ p.getFileSystem(job).delete(p);
+ }
+ }
+
+ public static class ReverseStringComparator implements RawComparator<Text> {
+ public int compare(Text a, Text b) {
+ return -a.compareTo(b);
+ }
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+ int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+ return -1 * WritableComparator.compareBytes(b1, s1+n1, l1-n1,
+ b2, s2+n2, l2-n2);
+ }
+ }
+
+ public void testTotalOrderCustomComparator() throws Exception {
+ TotalOrderPartitioner<Text,NullWritable> partitioner =
+ new TotalOrderPartitioner<Text,NullWritable>();
+ JobConf job = new JobConf();
+ Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
+ Arrays.sort(revSplitStrings, new ReverseStringComparator());
+ Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+ "totalordercustomcomparator", job, revSplitStrings);
+ job.setBoolean("total.order.partitioner.natural.order", false);
+ job.setMapOutputKeyClass(Text.class);
+ job.setOutputKeyComparatorClass(ReverseStringComparator.class);
+ ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>();
+ revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
+ revCheck.add(new Check<Text>(new Text("aaabb"), 9));
+ revCheck.add(new Check<Text>(new Text("aabbb"), 9));
+ revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
+ revCheck.add(new Check<Text>(new Text("babbb"), 8));
+ revCheck.add(new Check<Text>(new Text("baabb"), 8));
+ revCheck.add(new Check<Text>(new Text("yai"), 1));
+ revCheck.add(new Check<Text>(new Text("yak"), 1));
+ revCheck.add(new Check<Text>(new Text("z"), 0));
+ revCheck.add(new Check<Text>(new Text("ddngo"), 4));
+ revCheck.add(new Check<Text>(new Text("hi"), 3));
+ try {
+ partitioner.configure(job);
+ NullWritable nw = NullWritable.get();
+ for (Check<Text> chk : revCheck) {
+ assertEquals(chk.data.toString(), chk.part,
+ partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+ }
+ } finally {
+ p.getFileSystem(job).delete(p);
+ }
+ }
+
+}
Added: hadoop/core/trunk/src/tools/org/apache/hadoop/tools/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/tools/org/apache/hadoop/tools/InputSampler.java?rev=697283&view=auto
==============================================================================
--- hadoop/core/trunk/src/tools/org/apache/hadoop/tools/InputSampler.java (added)
+++ hadoop/core/trunk/src/tools/org/apache/hadoop/tools/InputSampler.java Fri Sep 19 16:39:09 2008
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Utility for collecting samples and writing a partition file for
+ * {@link org.apache.hadoop.mapred.lib.TotalOrderPartitioner}.
+ */
+public class InputSampler<K,V> implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(InputSampler.class);
+
+ static int printUsage() {
+ System.out.println("sampler -r <reduces>\n" +
+ " [-inFormat <input format class>]\n" +
+ " [-keyClass <map input & output key class>]\n" +
+ " [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
+ "// Sample from random splits at random (general)\n" +
+ " -splitSample <numSamples> <maxsplits> | " +
+ " // Sample from first records in splits (random data)\n"+
+ " -splitInterval <double pcnt> <maxsplits>]" +
+ " // Sample from splits at intervals (sorted data)");
+ System.out.println("Default sampler: -splitRandom 0.1 10000 10");
+ ToolRunner.printGenericCommandUsage(System.out);
+ return -1;
+ }
+
+ private JobConf conf;
+
+ public InputSampler(JobConf conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ if (!(conf instanceof JobConf)) {
+ this.conf = new JobConf(conf);
+ } else {
+ this.conf = (JobConf) conf;
+ }
+ }
+
+ /**
+ * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
+ */
+ public interface Sampler<K,V> {
+ /**
+ * For a given job, collect and return a subset of the keys from the
+ * input data.
+ */
+ K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
+ }
+
+ /**
+ * Samples the first n records from s splits.
+ * Inexpensive way to sample random data.
+ */
+ public static class SplitSampler<K,V> implements Sampler<K,V> {
+
+ private final int numSamples;
+ private final int maxSplitsSampled;
+
+ /**
+ * Create a SplitSampler sampling <em>all</em> splits.
+ * Takes the first numSamples / numSplits records from each split.
+ * @param numSamples Total number of samples to obtain from all selected
+ * splits.
+ */
+ public SplitSampler(int numSamples) {
+ this(numSamples, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Create a new SplitSampler.
+ * @param numSamples Total number of samples to obtain from all selected
+ * splits.
+ * @param maxSplitsSampled The maximum number of splits to examine.
+ */
+ public SplitSampler(int numSamples, int maxSplitsSampled) {
+ this.numSamples = numSamples;
+ this.maxSplitsSampled = maxSplitsSampled;
+ }
+
+ /**
+ * From each split sampled, take the first numSamples / numSplits records.
+ */
+ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+ public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+ InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+ ArrayList<K> samples = new ArrayList<K>(numSamples);
+ int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+ int splitStep = splits.length / splitsToSample;
+ int samplesPerSplit = numSamples / splitsToSample;
+ long records = 0;
+ for (int i = 0; i < splitsToSample; ++i) {
+ RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
+ job, Reporter.NULL);
+ K key = reader.createKey();
+ V value = reader.createValue();
+ while (reader.next(key, value)) {
+ samples.add(key);
+ key = reader.createKey();
+ ++records;
+ if ((i+1) * samplesPerSplit <= records) {
+ break;
+ }
+ }
+ reader.close();
+ }
+ return (K[])samples.toArray();
+ }
+ }
+
+ /**
+ * Sample from random points in the input.
+ * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
+ * each split.
+ */
+ public static class RandomSampler<K,V> implements Sampler<K,V> {
+ private double freq;
+ private final int numSamples;
+ private final int maxSplitsSampled;
+
+ /**
+ * Create a new RandomSampler sampling <em>all</em> splits.
+ * This will read every split at the client, which is very expensive.
+ * @param freq Probability with which a key will be chosen.
+ * @param numSamples Total number of samples to obtain from all selected
+ * splits.
+ */
+ public RandomSampler(double freq, int numSamples) {
+ this(freq, numSamples, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Create a new RandomSampler.
+ * @param freq Probability with which a key will be chosen.
+ * @param numSamples Total number of samples to obtain from all selected
+ * splits.
+ * @param maxSplitsSampled The maximum number of splits to examine.
+ */
+ public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
+ this.freq = freq;
+ this.numSamples = numSamples;
+ this.maxSplitsSampled = maxSplitsSampled;
+ }
+
+ /**
+ * Randomize the split order, then take the specified number of keys from
+ * each split sampled, where each key is selected with the specified
+ * probability and possibly replaced by a subsequently selected key when
+ * the quota of keys from that split is satisfied.
+ */
+ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+ public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+ InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+ ArrayList<K> samples = new ArrayList<K>(numSamples);
+ int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ LOG.debug("seed: " + seed);
+ // shuffle splits
+ for (int i = 0; i < splits.length; ++i) {
+ InputSplit tmp = splits[i];
+ int j = r.nextInt(splits.length);
+ splits[i] = splits[j];
+ splits[j] = tmp;
+ }
+ // our target rate is in terms of the maximum number of sample splits,
+ // but we accept the possibility of sampling additional splits to hit
+ // the target sample keyset
+ for (int i = 0; i < splitsToSample ||
+ (i < splits.length && samples.size() < numSamples); ++i) {
+ RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
+ Reporter.NULL);
+ K key = reader.createKey();
+ V value = reader.createValue();
+ while (reader.next(key, value)) {
+ if (r.nextDouble() <= freq) {
+ if (samples.size() < numSamples) {
+ samples.add(key);
+ } else {
+ // When exceeding the maximum number of samples, replace a
+ // random element with this one, then adjust the frequency
+ // to reflect the possibility of existing elements being
+ // pushed out
+ int ind = r.nextInt(numSamples);
+ if (ind != numSamples) {
+ samples.set(r.nextInt(numSamples - 1), key);
+ }
+ freq *= (numSamples - 1) / (double) numSamples;
+ }
+ key = reader.createKey();
+ }
+ }
+ reader.close();
+ }
+ return (K[])samples.toArray();
+ }
+ }
+
+ /**
+ * Sample from s splits at regular intervals.
+ * Useful for sorted data.
+ */
+ public static class IntervalSampler<K,V> implements Sampler<K,V> {
+ private final double freq;
+ private final int maxSplitsSampled;
+
+ /**
+ * Create a new IntervalSampler sampling <em>all</em> splits.
+ * @param freq The frequency with which records will be emitted.
+ */
+ public IntervalSampler(double freq) {
+ this(freq, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Create a new IntervalSampler.
+ * @param freq The frequency with which records will be emitted.
+ * @param maxSplitsSampled The maximum number of splits to examine.
+ * @see #getSample
+ */
+ public IntervalSampler(double freq, int maxSplitsSampled) {
+ this.freq = freq;
+ this.maxSplitsSampled = maxSplitsSampled;
+ }
+
+ /**
+ * For each split sampled, emit when the ratio of the number of records
+ * retained to the total record count is less than the specified
+ * frequency.
+ */
+ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+ public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+ InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+ ArrayList<K> samples = new ArrayList<K>();
+ int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+ int splitStep = splits.length / splitsToSample;
+ long records = 0;
+ long kept = 0;
+ for (int i = 0; i < splitsToSample; ++i) {
+ RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
+ job, Reporter.NULL);
+ K key = reader.createKey();
+ V value = reader.createValue();
+ while (reader.next(key, value)) {
+ ++records;
+ if ((double) kept / records < freq) {
+ ++kept;
+ samples.add(key);
+ key = reader.createKey();
+ }
+ }
+ reader.close();
+ }
+ return (K[])samples.toArray();
+ }
+ }
+
+ /**
+ * Write a partition file for the given job, using the Sampler provided.
+ * Queries the sampler for a sample keyset, sorts by the output key
+ * comparator, selects the keys for each rank, and writes to the destination
+ * returned from {@link
+ org.apache.hadoop.mapred.lib.TotalOrderPartitioner#getPartitionFile}.
+ */
+ @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
+ public static <K,V> void writePartitionFile(JobConf job,
+ Sampler<K,V> sampler) throws IOException {
+ final InputFormat<K,V> inf = (InputFormat<K,V>) job.getInputFormat();
+ int numPartitions = job.getNumReduceTasks();
+ K[] samples = sampler.getSample(inf, job);
+ LOG.info("Using " + samples.length + " samples");
+ RawComparator<K> comparator =
+ (RawComparator<K>) job.getOutputKeyComparator();
+ Arrays.sort(samples, comparator);
+ Path dst = new Path(TotalOrderPartitioner.getPartitionFile(job));
+ FileSystem fs = dst.getFileSystem(job);
+ if (fs.exists(dst)) {
+ fs.delete(dst, false);
+ }
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, dst,
+ job.getMapOutputKeyClass(), NullWritable.class);
+ NullWritable nullValue = NullWritable.get();
+ float stepSize = samples.length / (float) numPartitions;
+ int last = -1;
+ for(int i = 1; i < numPartitions; ++i) {
+ int k = Math.round(stepSize * i);
+ while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
+ ++k;
+ }
+ writer.append(samples[k], nullValue);
+ last = k;
+ }
+ writer.close();
+ }
+
+ /**
+ * Driver for InputSampler from the command line.
+ * Configures a JobConf instance and calls {@link #writePartitionFile}.
+ */
+ public int run(String[] args) throws Exception {
+ JobConf job = (JobConf) getConf();
+ ArrayList<String> otherArgs = new ArrayList<String>();
+ Sampler<K,V> sampler = null;
+ for(int i=0; i < args.length; ++i) {
+ try {
+ if ("-r".equals(args[i])) {
+ job.setNumReduceTasks(Integer.parseInt(args[++i]));
+ } else if ("-inFormat".equals(args[i])) {
+ job.setInputFormat(
+ Class.forName(args[++i]).asSubclass(InputFormat.class));
+ } else if ("-keyClass".equals(args[i])) {
+ job.setMapOutputKeyClass(
+ Class.forName(args[++i]).asSubclass(WritableComparable.class));
+ } else if ("-splitSample".equals(args[i])) {
+ int numSamples = Integer.parseInt(args[++i]);
+ int maxSplits = Integer.parseInt(args[++i]);
+ if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+ sampler = new SplitSampler<K,V>(numSamples, maxSplits);
+ } else if ("-splitRandom".equals(args[i])) {
+ double pcnt = Double.parseDouble(args[++i]);
+ int numSamples = Integer.parseInt(args[++i]);
+ int maxSplits = Integer.parseInt(args[++i]);
+ if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+ sampler = new RandomSampler<K,V>(pcnt, numSamples, maxSplits);
+ } else if ("-splitInterval".equals(args[i])) {
+ double pcnt = Double.parseDouble(args[++i]);
+ int maxSplits = Integer.parseInt(args[++i]);
+ if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+ sampler = new IntervalSampler<K,V>(pcnt, maxSplits);
+ } else {
+ otherArgs.add(args[i]);
+ }
+ } catch (NumberFormatException except) {
+ System.out.println("ERROR: Integer expected instead of " + args[i]);
+ return printUsage();
+ } catch (ArrayIndexOutOfBoundsException except) {
+ System.out.println("ERROR: Required parameter missing from " +
+ args[i-1]);
+ return printUsage();
+ }
+ }
+ if (job.getNumReduceTasks() <= 1) {
+ System.err.println("Sampler requires more than one reducer");
+ return printUsage();
+ }
+ if (otherArgs.size() < 2) {
+ System.out.println("ERROR: Wrong number of parameters: ");
+ return printUsage();
+ }
+ if (null == sampler) {
+ sampler = new RandomSampler<K,V>(0.1, 10000, 10);
+ }
+
+ Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
+ TotalOrderPartitioner.setPartitionFile(job, outf);
+ ArrayList<Path> plist = new ArrayList<Path>(otherArgs.size());
+ for (String s : otherArgs) {
+ FileInputFormat.addInputPath(job, new Path(s));
+ }
+ InputSampler.<K,V>writePartitionFile(job, sampler);
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ JobConf job = new JobConf(InputSampler.class);
+ InputSampler sampler = new InputSampler(job);
+ int res = ToolRunner.run(sampler, args);
+ System.exit(res);
+ }
+}