You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dd...@apache.org on 2013/10/28 21:13:21 UTC

svn commit: r1536502 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/ test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/

Author: ddas
Date: Mon Oct 28 20:13:20 2013
New Revision: 1536502

URL: http://svn.apache.org/r1536502
Log:
HBASE-8553. improve unit-test coverage of package org.apache.hadoop.hbase.mapreduce.hadoopbackport. Contributed by Ivan A. Veselovsky.

Added:
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSampler.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSamplerTool.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestTotalOrderPartitioner.java
Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java?rev=1536502&r1=1536501&r2=1536502&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java Mon Oct 28 20:13:20 2013
@@ -79,7 +79,7 @@ public class InputSampler<K,V> extends C
   }
 
   /**
-   * Interface to sample using an 
+   * Interface to sample using an
    * {@link org.apache.hadoop.mapreduce.InputFormat}.
    */
   public interface Sampler<K,V> {
@@ -87,7 +87,7 @@ public class InputSampler<K,V> extends C
      * For a given job, collect and return a subset of the keys from the
      * input data.
      */
-    K[] getSample(InputFormat<K,V> inf, Job job) 
+    K[] getSample(InputFormat<K,V> inf, Job job)
     throws IOException, InterruptedException;
   }
 
@@ -125,7 +125,8 @@ public class InputSampler<K,V> extends C
      * From each split sampled, take the first numSamples / numSplits records.
      */
     @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
-    public K[] getSample(InputFormat<K,V> inf, Job job) 
+    @Override
+    public K[] getSample(InputFormat<K,V> inf, Job job)
         throws IOException, InterruptedException {
       List<InputSplit> splits = inf.getSplits(job);
       ArrayList<K> samples = new ArrayList<K>(numSamples);
@@ -160,7 +161,7 @@ public class InputSampler<K,V> extends C
    * here when we should be using native hadoop TotalOrderPartitioner).
    * @param job
    * @return Context
-   * @throws IOException 
+   * @throws IOException
    */
   public static TaskAttemptContext getTaskAttemptContext(final Job job)
   throws IOException {
@@ -218,7 +219,8 @@ public class InputSampler<K,V> extends C
      * the quota of keys from that split is satisfied.
      */
     @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
-    public K[] getSample(InputFormat<K,V> inf, Job job) 
+    @Override
+    public K[] getSample(InputFormat<K,V> inf, Job job)
         throws IOException, InterruptedException {
       List<InputSplit> splits = inf.getSplits(job);
       ArrayList<K> samples = new ArrayList<K>(numSamples);
@@ -302,7 +304,8 @@ public class InputSampler<K,V> extends C
      * frequency.
      */
     @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
-    public K[] getSample(InputFormat<K,V> inf, Job job) 
+    @Override
+    public K[] getSample(InputFormat<K,V> inf, Job job)
         throws IOException, InterruptedException {
       List<InputSplit> splits = inf.getSplits(job);
       ArrayList<K> samples = new ArrayList<K>();
@@ -335,10 +338,10 @@ public class InputSampler<K,V> extends C
    * returned from {@link TotalOrderPartitioner#getPartitionFile}.
    */
   @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
-  public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler) 
+  public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler)
       throws IOException, ClassNotFoundException, InterruptedException {
     Configuration conf = job.getConfiguration();
-    final InputFormat inf = 
+    final InputFormat inf =
         ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
     int numPartitions = job.getNumReduceTasks();
     K[] samples = sampler.getSample(inf, job);
@@ -351,7 +354,7 @@ public class InputSampler<K,V> extends C
     if (fs.exists(dst)) {
       fs.delete(dst, false);
     }
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, 
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs,
       conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
     NullWritable nullValue = NullWritable.get();
     float stepSize = samples.length / (float) numPartitions;
@@ -371,6 +374,7 @@ public class InputSampler<K,V> extends C
    * Driver for InputSampler from the command line.
    * Configures a JobConf instance and calls {@link #writePartitionFile}.
    */
+  @Override
   public int run(String[] args) throws Exception {
     Job job = new Job(getConf());
     ArrayList<String> otherArgs = new ArrayList<String>();
@@ -426,8 +430,8 @@ public class InputSampler<K,V> extends C
     }
 
     Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
-    TotalOrderPartitioner.setPartitionFile(getConf(), outf);
-    for (String s : otherArgs) {
+    TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), outf);
+    for (String s: otherArgs) {
       FileInputFormat.addInputPath(job, new Path(s));
     }
     InputSampler.<K,V>writePartitionFile(job, sampler);

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSampler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSampler.java?rev=1536502&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSampler.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSampler.java Mon Oct 28 20:13:20 2013
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * The test is ported from Hadoop branch-0.23 with very small changes.
+ */
+@Category(SmallTests.class)
+public class TestInputSampler {
+
+  static class SequentialSplit extends InputSplit {
+    private int i;
+    SequentialSplit(int i) {
+      this.i = i;
+    }
+    @Override
+    public long getLength() { return 0; }
+    @Override
+    public String[] getLocations() { return new String[0]; }
+    public int getInit() { return i; }
+  }
+
+  static class TestInputSamplerIF
+      extends InputFormat<IntWritable,NullWritable> {
+
+    final int maxDepth;
+    final ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
+
+    TestInputSamplerIF(int maxDepth, int numSplits, int... splitInit) {
+      this.maxDepth = maxDepth;
+      assert splitInit.length == numSplits;
+      for (int i = 0; i < numSplits; ++i) {
+        splits.add(new SequentialSplit(splitInit[i]));
+      }
+    }
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context)
+        throws IOException, InterruptedException {
+      return splits;
+    }
+
+    @Override
+    public RecordReader<IntWritable,NullWritable> createRecordReader(
+        final InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      return new RecordReader<IntWritable,NullWritable>() {
+        private int maxVal;
+        private final IntWritable i = new IntWritable();
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+          i.set(((SequentialSplit)split).getInit() - 1);
+          maxVal = i.get() + maxDepth + 1;
+        }
+        @Override
+        public boolean nextKeyValue() {
+          i.set(i.get() + 1);
+          return i.get() < maxVal;
+        }
+        @Override
+        public IntWritable getCurrentKey() { return i; }
+        @Override
+        public NullWritable getCurrentValue() { return NullWritable.get(); }
+        @Override
+        public float getProgress() { return 1.0f; }
+        @Override
+        public void close() { }
+      };
+    }
+
+  }
+
+  /**
+   * Verify SplitSampler contract, that an equal number of records are taken
+   * from the first splits.
+   */
+  @Test
+  @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+  public void testSplitSampler() throws Exception {
+    final int TOT_SPLITS = 15;
+    final int NUM_SPLITS = 5;
+    final int STEP_SAMPLE = 5;
+    final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE;
+    InputSampler.Sampler<IntWritable,NullWritable> sampler =
+      new InputSampler.SplitSampler<IntWritable,NullWritable>(
+          NUM_SAMPLES, NUM_SPLITS);
+    int inits[] = new int[TOT_SPLITS];
+    for (int i = 0; i < TOT_SPLITS; ++i) {
+      inits[i] = i * STEP_SAMPLE;
+    }
+    Job ignored = new Job();//Job.getInstance();
+    Object[] samples = sampler.getSample(
+        new TestInputSamplerIF(100000, TOT_SPLITS, inits), ignored);
+    assertEquals(NUM_SAMPLES, samples.length);
+    Arrays.sort(samples, new IntWritable.Comparator());
+    for (int i = 0; i < NUM_SAMPLES; ++i) {
+      assertEquals(i, ((IntWritable)samples[i]).get());
+    }
+  }
+
+  /**
+   * Verify IntervalSampler contract, that samples are taken at regular
+   * intervals from the given splits.
+   */
+  @Test
+  @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+  public void testIntervalSampler() throws Exception {
+    final int TOT_SPLITS = 16;
+    final int PER_SPLIT_SAMPLE = 4;
+    final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
+    final double FREQ = 1.0 / TOT_SPLITS;
+    InputSampler.Sampler<IntWritable,NullWritable> sampler =
+      new InputSampler.IntervalSampler<IntWritable,NullWritable>(
+          FREQ, NUM_SAMPLES);
+    int inits[] = new int[TOT_SPLITS];
+    for (int i = 0; i < TOT_SPLITS; ++i) {
+      inits[i] = i;
+    }
+    Job ignored = new Job();
+    Object[] samples = sampler.getSample(new TestInputSamplerIF(
+          NUM_SAMPLES, TOT_SPLITS, inits), ignored);
+    assertEquals(NUM_SAMPLES, samples.length);
+    Arrays.sort(samples, new IntWritable.Comparator());
+    for (int i = 0; i < NUM_SAMPLES; ++i) {
+      assertEquals(i, ((IntWritable)samples[i]).get());
+    }
+  }
+
+}

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSamplerTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSamplerTool.java?rev=1536502&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSamplerTool.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSamplerTool.java Mon Oct 28 20:13:20 2013
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests {@link InputSampler} as a {@link Tool}.
+ */
+@Category(SmallTests.class)
+public class TestInputSamplerTool {
+
+  private static final int NUM_REDUCES = 4;
+
+  private static final String input1Str =
+     "2\n"
+    +"...5\n"
+    +"......8\n";
+  private static final String input2Str =
+     "2\n"
+    +".3\n"
+    +"..4\n"
+    +"...5\n"
+    +"....6\n"
+    +".....7\n"
+    +"......8\n"
+    +".......9\n";
+
+  private static File tempDir;
+  private static String input1, input2, output;
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    tempDir = FileUtil.createLocalTempFile(
+      new File(FileUtils.getTempDirectory(), TestInputSamplerTool.class.getName() + "-tmp-"),
+      "", false);
+    tempDir.delete();
+    tempDir.mkdirs();
+    assertTrue(tempDir.exists());
+    assertTrue(tempDir.isDirectory());
+    // define files:
+    input1 = tempDir.getAbsolutePath() + "/input1";
+    input2 = tempDir.getAbsolutePath() + "/input2";
+    output = tempDir.getAbsolutePath() + "/output";
+    // create 2 input files:
+    IOUtils.copy(new StringReader(input1Str), new FileOutputStream(input1));
+    IOUtils.copy(new StringReader(input2Str), new FileOutputStream(input2));
+  }
+
+  @AfterClass
+  public static void afterClass() throws IOException {
+    final File td = tempDir;
+    if (td != null && td.exists()) {
+      FileUtil.fullyDelete(tempDir);
+    }
+  }
+
+  @Test
+  public void testIncorrectParameters() throws Exception {
+    Tool tool = new InputSampler<Object,Object>(new Configuration());
+
+    int result = tool.run(new String[] { "-r" });
+    assertTrue(result != 0);
+
+    result = tool.run(new String[] { "-r", "not-a-number" });
+    assertTrue(result != 0);
+
+    // more than one reducer is required:
+    result = tool.run(new String[] { "-r", "1" });
+    assertTrue(result != 0);
+
+    try {
+      result = tool.run(new String[] { "-inFormat", "java.lang.Object" });
+      fail("ClassCastException expected");
+    } catch (ClassCastException cce) {
+      // expected
+    }
+
+    try {
+      result = tool.run(new String[] { "-keyClass", "java.lang.Object" });
+      fail("ClassCastException expected");
+    } catch (ClassCastException cce) {
+      // expected
+    }
+
+    result = tool.run(new String[] { "-splitSample", "1", });
+    assertTrue(result != 0);
+
+    result = tool.run(new String[] { "-splitRandom", "1.0", "2", "xxx" });
+    assertTrue(result != 0);
+
+    result = tool.run(new String[] { "-splitInterval", "yyy", "5" });
+    assertTrue(result != 0);
+
+    // not enough subsequent arguments:
+    result = tool.run(new String[] { "-r", "2", "-splitInterval", "11.0f", "0", "input" });
+    assertTrue(result != 0);
+  }
+
+  @Test
+  public void testSplitSample() throws Exception {
+    Tool tool = new InputSampler<Object,Object>(new Configuration());
+    int result = tool.run(new String[] { "-r", Integer.toString(NUM_REDUCES),
+        "-splitSample", "10", "100",
+        input1, input2, output });
+    assertEquals(0, result);
+
+    Object[] partitions = readPartitions(output);
+    assertArrayEquals(
+        new LongWritable[] { new LongWritable(2L), new LongWritable(7L), new LongWritable(20L),},
+        partitions);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testSplitRamdom() throws Exception {
+    Tool tool = new InputSampler<Object,Object>(new Configuration());
+    int result = tool.run(new String[] { "-r", Integer.toString(NUM_REDUCES),
+        // Use 0.999 probability to reduce the flakiness of the test because
+        // the test will fail if the number of samples is less than (number of reduces + 1).
+        "-splitRandom", "0.999f", "20", "100",
+        input1, input2, output });
+    assertEquals(0, result);
+    Object[] partitions = readPartitions(output);
+    // must be 3 split points since NUM_REDUCES = 4:
+    assertEquals(3, partitions.length);
+    // check that the partition array is sorted:
+    Object[] sortedPartitions = Arrays.copyOf(partitions, partitions.length);
+    Arrays.sort(sortedPartitions, new LongWritable.Comparator());
+    assertArrayEquals(sortedPartitions, partitions);
+  }
+
+  @Test
+  public void testSplitInterval() throws Exception {
+    Tool tool = new InputSampler<Object,Object>(new Configuration());
+    int result = tool.run(new String[] { "-r", Integer.toString(NUM_REDUCES),
+        "-splitInterval", "0.5f", "0",
+        input1, input2, output });
+    assertEquals(0, result);
+    Object[] partitions = readPartitions(output);
+    assertArrayEquals(new LongWritable[] { new LongWritable(7L), new LongWritable(9L),
+      new LongWritable(35L),}, partitions);
+  }
+
+  private Object[] readPartitions(String filePath) throws Exception {
+    Configuration conf = new Configuration();
+    TotalOrderPartitioner.setPartitionFile(conf, new Path(filePath));
+    Object[] partitions = readPartitions(FileSystem.getLocal(conf), new Path(filePath),
+      LongWritable.class, conf);
+    return partitions;
+  }
+
+  private Object[] readPartitions(FileSystem fs, Path p, Class<?> keyClass,
+      Configuration conf) throws IOException {
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
+    ArrayList<Object> parts = new ArrayList<Object>();
+    Writable key = (Writable)ReflectionUtils.newInstance(keyClass, conf);
+    NullWritable value = NullWritable.get();
+    while (reader.next(key, value)) {
+      parts.add(key);
+      key = (Writable)ReflectionUtils.newInstance(keyClass, conf);
+    }
+    reader.close();
+    return parts.toArray((Object[])Array.newInstance(keyClass, parts.size()));
+  }
+}

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestTotalOrderPartitioner.java?rev=1536502&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestTotalOrderPartitioner.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestTotalOrderPartitioner.java Mon Oct 28 20:13:20 2013
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.experimental.categories.Category;
+
+/**
+ * The test is ported from Hadoop branch-0.23 with very small changes.
+ */
+@Category(SmallTests.class)
+public class TestTotalOrderPartitioner extends TestCase {
+
+  private static final Text[] splitStrings = new Text[] {
+    // -inf            // 0
+    new Text("aabbb"), // 1
+    new Text("babbb"), // 2
+    new Text("daddd"), // 3
+    new Text("dddee"), // 4
+    new Text("ddhee"), // 5
+    new Text("dingo"), // 6
+    new Text("hijjj"), // 7
+    new Text("n"),     // 8
+    new Text("yak"),   // 9
+  };
+
+  static class Check<T> {
+    T data;
+    int part;
+    Check(T data, int part) {
+      this.data = data;
+      this.part = part;
+    }
+  }
+
+  private static final ArrayList<Check<Text>> testStrings =
+    new ArrayList<Check<Text>>();
+  static {
+    testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
+    testStrings.add(new Check<Text>(new Text("aaabb"), 0));
+    testStrings.add(new Check<Text>(new Text("aabbb"), 1));
+    testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
+    testStrings.add(new Check<Text>(new Text("babbb"), 2));
+    testStrings.add(new Check<Text>(new Text("baabb"), 1));
+    testStrings.add(new Check<Text>(new Text("yai"), 8));
+    testStrings.add(new Check<Text>(new Text("yak"), 9));
+    testStrings.add(new Check<Text>(new Text("z"), 9));
+    testStrings.add(new Check<Text>(new Text("ddngo"), 5));
+    testStrings.add(new Check<Text>(new Text("hi"), 6));
+  };
+
+  private static <T extends WritableComparable<?>> Path writePartitionFile(
+      String testname, Configuration conf, T[] splits) throws IOException {
+    final FileSystem fs = FileSystem.getLocal(conf);
+    final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
+                                 ).makeQualified(fs);
+    Path p = new Path(testdir, testname + "/_partition.lst");
+    TotalOrderPartitioner.setPartitionFile(conf, p);
+    conf.setInt("mapreduce.job.reduces", splits.length + 1);
+    SequenceFile.Writer w = null;
+    try {
+      w = SequenceFile.createWriter(fs, conf, p,
+          splits[0].getClass(), NullWritable.class,
+          SequenceFile.CompressionType.NONE);
+      for (int i = 0; i < splits.length; ++i) {
+        w.append(splits[i], NullWritable.get());
+      }
+    } finally {
+      if (null != w)
+        w.close();
+    }
+    return p;
+  }
+
+  public void testTotalOrderMemCmp() throws Exception {
+    TotalOrderPartitioner<Text,NullWritable> partitioner =
+      new TotalOrderPartitioner<Text,NullWritable>();
+
+    // Need to use old JobConf-based variant here:
+    JobConf conf = new JobConf();
+    conf.setMapOutputKeyClass(Text.class);
+    conf.setNumReduceTasks(splitStrings.length + 1);
+
+    Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+        "totalordermemcmp", conf, splitStrings);
+    try {
+      partitioner.setConf(conf);
+      NullWritable nw = NullWritable.get();
+      for (Check<Text> chk : testStrings) {
+        assertEquals(chk.data.toString(), chk.part,
+            partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+      }
+    } finally {
+      p.getFileSystem(conf).delete(p, true);
+    }
+  }
+
+  public void testTotalOrderBinarySearch() throws Exception {
+    TotalOrderPartitioner<Text,NullWritable> partitioner =
+      new TotalOrderPartitioner<Text,NullWritable>();
+    JobConf conf = new JobConf();
+    conf.setMapOutputKeyClass(Text.class);
+    conf.setNumReduceTasks(splitStrings.length + 1);
+
+    Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+        "totalorderbinarysearch", conf, splitStrings);
+    conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
+    try {
+      partitioner.setConf(conf);
+      NullWritable nw = NullWritable.get();
+      for (Check<Text> chk : testStrings) {
+        assertEquals(chk.data.toString(), chk.part,
+            partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+      }
+    } finally {
+      p.getFileSystem(conf).delete(p, true);
+    }
+  }
+
+  public static class ReverseStringComparator implements RawComparator<Text> {
+    @Override
+    public int compare(Text a, Text b) {
+      return -a.compareTo(b);
+    }
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+      int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+      return -1 * WritableComparator.compareBytes(b1, s1+n1, l1-n1,
+                                                  b2, s2+n2, l2-n2);
+    }
+  }
+
+  public void testTotalOrderCustomComparator() throws Exception {
+    TotalOrderPartitioner<Text,NullWritable> partitioner =
+      new TotalOrderPartitioner<Text,NullWritable>();
+
+    final JobConf conf = new JobConf();
+    conf.setMapOutputKeyClass(Text.class);
+    conf.setNumReduceTasks(splitStrings.length + 1);
+
+    Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
+    Arrays.sort(revSplitStrings, new ReverseStringComparator());
+    Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+        "totalordercustomcomparator", conf, revSplitStrings);
+    conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
+    conf.setOutputKeyComparatorClass(ReverseStringComparator.class);
+
+    ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>();
+    revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
+    revCheck.add(new Check<Text>(new Text("aaabb"), 9));
+    revCheck.add(new Check<Text>(new Text("aabbb"), 9));
+    revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
+    revCheck.add(new Check<Text>(new Text("babbb"), 8));
+    revCheck.add(new Check<Text>(new Text("baabb"), 8));
+    revCheck.add(new Check<Text>(new Text("yai"), 1));
+    revCheck.add(new Check<Text>(new Text("yak"), 1));
+    revCheck.add(new Check<Text>(new Text("z"), 0));
+    revCheck.add(new Check<Text>(new Text("ddngo"), 4));
+    revCheck.add(new Check<Text>(new Text("hi"), 3));
+    try {
+      partitioner.setConf(conf);
+      NullWritable nw = NullWritable.get();
+      for (Check<Text> chk : revCheck) {
+        assertEquals(chk.data.toString(), chk.part,
+            partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+      }
+    } finally {
+      p.getFileSystem(conf).delete(p, true);
+    }
+  }
+}