You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dd...@apache.org on 2013/10/28 21:13:21 UTC
svn commit: r1536502 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/
test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/
Author: ddas
Date: Mon Oct 28 20:13:20 2013
New Revision: 1536502
URL: http://svn.apache.org/r1536502
Log:
HBASE-8553. improve unit-test coverage of package org.apache.hadoop.hbase.mapreduce.hadoopbackport. Contributed by Ivan A. Veselovsky.
Added:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSampler.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSamplerTool.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestTotalOrderPartitioner.java
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java?rev=1536502&r1=1536501&r2=1536502&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/InputSampler.java Mon Oct 28 20:13:20 2013
@@ -79,7 +79,7 @@ public class InputSampler<K,V> extends C
}
/**
- * Interface to sample using an
+ * Interface to sample using an
* {@link org.apache.hadoop.mapreduce.InputFormat}.
*/
public interface Sampler<K,V> {
@@ -87,7 +87,7 @@ public class InputSampler<K,V> extends C
* For a given job, collect and return a subset of the keys from the
* input data.
*/
- K[] getSample(InputFormat<K,V> inf, Job job)
+ K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException;
}
@@ -125,7 +125,8 @@ public class InputSampler<K,V> extends C
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
- public K[] getSample(InputFormat<K,V> inf, Job job)
+ @Override
+ public K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(numSamples);
@@ -160,7 +161,7 @@ public class InputSampler<K,V> extends C
* here when we should be using native hadoop TotalOrderPartitioner).
* @param job
* @return Context
- * @throws IOException
+ * @throws IOException
*/
public static TaskAttemptContext getTaskAttemptContext(final Job job)
throws IOException {
@@ -218,7 +219,8 @@ public class InputSampler<K,V> extends C
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
- public K[] getSample(InputFormat<K,V> inf, Job job)
+ @Override
+ public K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(numSamples);
@@ -302,7 +304,8 @@ public class InputSampler<K,V> extends C
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
- public K[] getSample(InputFormat<K,V> inf, Job job)
+ @Override
+ public K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>();
@@ -335,10 +338,10 @@ public class InputSampler<K,V> extends C
* returned from {@link TotalOrderPartitioner#getPartitionFile}.
*/
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
- public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler)
+ public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = job.getConfiguration();
- final InputFormat inf =
+ final InputFormat inf =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
int numPartitions = job.getNumReduceTasks();
K[] samples = sampler.getSample(inf, job);
@@ -351,7 +354,7 @@ public class InputSampler<K,V> extends C
if (fs.exists(dst)) {
fs.delete(dst, false);
}
- SequenceFile.Writer writer = SequenceFile.createWriter(fs,
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs,
conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
NullWritable nullValue = NullWritable.get();
float stepSize = samples.length / (float) numPartitions;
@@ -371,6 +374,7 @@ public class InputSampler<K,V> extends C
* Driver for InputSampler from the command line.
* Configures a JobConf instance and calls {@link #writePartitionFile}.
*/
+ @Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
ArrayList<String> otherArgs = new ArrayList<String>();
@@ -426,8 +430,8 @@ public class InputSampler<K,V> extends C
}
Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
- TotalOrderPartitioner.setPartitionFile(getConf(), outf);
- for (String s : otherArgs) {
+ TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), outf);
+ for (String s: otherArgs) {
FileInputFormat.addInputPath(job, new Path(s));
}
InputSampler.<K,V>writePartitionFile(job, sampler);
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSampler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSampler.java?rev=1536502&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSampler.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSampler.java Mon Oct 28 20:13:20 2013
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * The test is ported from Hadoop branch-0.23 with very small changes.
+ */
+@Category(SmallTests.class)
+public class TestInputSampler {
+
+ static class SequentialSplit extends InputSplit {
+ private int i;
+ SequentialSplit(int i) {
+ this.i = i;
+ }
+ @Override
+ public long getLength() { return 0; }
+ @Override
+ public String[] getLocations() { return new String[0]; }
+ public int getInit() { return i; }
+ }
+
+ static class TestInputSamplerIF
+ extends InputFormat<IntWritable,NullWritable> {
+
+ final int maxDepth;
+ final ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
+
+ TestInputSamplerIF(int maxDepth, int numSplits, int... splitInit) {
+ this.maxDepth = maxDepth;
+ assert splitInit.length == numSplits;
+ for (int i = 0; i < numSplits; ++i) {
+ splits.add(new SequentialSplit(splitInit[i]));
+ }
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context)
+ throws IOException, InterruptedException {
+ return splits;
+ }
+
+ @Override
+ public RecordReader<IntWritable,NullWritable> createRecordReader(
+ final InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new RecordReader<IntWritable,NullWritable>() {
+ private int maxVal;
+ private final IntWritable i = new IntWritable();
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ i.set(((SequentialSplit)split).getInit() - 1);
+ maxVal = i.get() + maxDepth + 1;
+ }
+ @Override
+ public boolean nextKeyValue() {
+ i.set(i.get() + 1);
+ return i.get() < maxVal;
+ }
+ @Override
+ public IntWritable getCurrentKey() { return i; }
+ @Override
+ public NullWritable getCurrentValue() { return NullWritable.get(); }
+ @Override
+ public float getProgress() { return 1.0f; }
+ @Override
+ public void close() { }
+ };
+ }
+
+ }
+
+ /**
+ * Verify SplitSampler contract, that an equal number of records are taken
+ * from the first splits.
+ */
+ @Test
+ @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+ public void testSplitSampler() throws Exception {
+ final int TOT_SPLITS = 15;
+ final int NUM_SPLITS = 5;
+ final int STEP_SAMPLE = 5;
+ final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE;
+ InputSampler.Sampler<IntWritable,NullWritable> sampler =
+ new InputSampler.SplitSampler<IntWritable,NullWritable>(
+ NUM_SAMPLES, NUM_SPLITS);
+ int inits[] = new int[TOT_SPLITS];
+ for (int i = 0; i < TOT_SPLITS; ++i) {
+ inits[i] = i * STEP_SAMPLE;
+ }
+ Job ignored = new Job();//Job.getInstance();
+ Object[] samples = sampler.getSample(
+ new TestInputSamplerIF(100000, TOT_SPLITS, inits), ignored);
+ assertEquals(NUM_SAMPLES, samples.length);
+ Arrays.sort(samples, new IntWritable.Comparator());
+ for (int i = 0; i < NUM_SAMPLES; ++i) {
+ assertEquals(i, ((IntWritable)samples[i]).get());
+ }
+ }
+
+ /**
+ * Verify IntervalSampler contract, that samples are taken at regular
+ * intervals from the given splits.
+ */
+ @Test
+ @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+ public void testIntervalSampler() throws Exception {
+ final int TOT_SPLITS = 16;
+ final int PER_SPLIT_SAMPLE = 4;
+ final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
+ final double FREQ = 1.0 / TOT_SPLITS;
+ InputSampler.Sampler<IntWritable,NullWritable> sampler =
+ new InputSampler.IntervalSampler<IntWritable,NullWritable>(
+ FREQ, NUM_SAMPLES);
+ int inits[] = new int[TOT_SPLITS];
+ for (int i = 0; i < TOT_SPLITS; ++i) {
+ inits[i] = i;
+ }
+ Job ignored = new Job();
+ Object[] samples = sampler.getSample(new TestInputSamplerIF(
+ NUM_SAMPLES, TOT_SPLITS, inits), ignored);
+ assertEquals(NUM_SAMPLES, samples.length);
+ Arrays.sort(samples, new IntWritable.Comparator());
+ for (int i = 0; i < NUM_SAMPLES; ++i) {
+ assertEquals(i, ((IntWritable)samples[i]).get());
+ }
+ }
+
+}
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSamplerTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSamplerTool.java?rev=1536502&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSamplerTool.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestInputSamplerTool.java Mon Oct 28 20:13:20 2013
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests {@link InputSampler} as a {@link Tool}.
+ */
+@Category(SmallTests.class)
+public class TestInputSamplerTool {
+
+ private static final int NUM_REDUCES = 4;
+
+ private static final String input1Str =
+ "2\n"
+ +"...5\n"
+ +"......8\n";
+ private static final String input2Str =
+ "2\n"
+ +".3\n"
+ +"..4\n"
+ +"...5\n"
+ +"....6\n"
+ +".....7\n"
+ +"......8\n"
+ +".......9\n";
+
+ private static File tempDir;
+ private static String input1, input2, output;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ tempDir = FileUtil.createLocalTempFile(
+ new File(FileUtils.getTempDirectory(), TestInputSamplerTool.class.getName() + "-tmp-"),
+ "", false);
+ tempDir.delete();
+ tempDir.mkdirs();
+ assertTrue(tempDir.exists());
+ assertTrue(tempDir.isDirectory());
+ // define files:
+ input1 = tempDir.getAbsolutePath() + "/input1";
+ input2 = tempDir.getAbsolutePath() + "/input2";
+ output = tempDir.getAbsolutePath() + "/output";
+ // create 2 input files:
+ IOUtils.copy(new StringReader(input1Str), new FileOutputStream(input1));
+ IOUtils.copy(new StringReader(input2Str), new FileOutputStream(input2));
+ }
+
+ @AfterClass
+ public static void afterClass() throws IOException {
+ final File td = tempDir;
+ if (td != null && td.exists()) {
+ FileUtil.fullyDelete(tempDir);
+ }
+ }
+
+ @Test
+ public void testIncorrectParameters() throws Exception {
+ Tool tool = new InputSampler<Object,Object>(new Configuration());
+
+ int result = tool.run(new String[] { "-r" });
+ assertTrue(result != 0);
+
+ result = tool.run(new String[] { "-r", "not-a-number" });
+ assertTrue(result != 0);
+
+ // more than one reducer is required:
+ result = tool.run(new String[] { "-r", "1" });
+ assertTrue(result != 0);
+
+ try {
+ result = tool.run(new String[] { "-inFormat", "java.lang.Object" });
+ fail("ClassCastException expected");
+ } catch (ClassCastException cce) {
+ // expected
+ }
+
+ try {
+ result = tool.run(new String[] { "-keyClass", "java.lang.Object" });
+ fail("ClassCastException expected");
+ } catch (ClassCastException cce) {
+ // expected
+ }
+
+ result = tool.run(new String[] { "-splitSample", "1", });
+ assertTrue(result != 0);
+
+ result = tool.run(new String[] { "-splitRandom", "1.0", "2", "xxx" });
+ assertTrue(result != 0);
+
+ result = tool.run(new String[] { "-splitInterval", "yyy", "5" });
+ assertTrue(result != 0);
+
+ // not enough subsequent arguments:
+ result = tool.run(new String[] { "-r", "2", "-splitInterval", "11.0f", "0", "input" });
+ assertTrue(result != 0);
+ }
+
+ @Test
+ public void testSplitSample() throws Exception {
+ Tool tool = new InputSampler<Object,Object>(new Configuration());
+ int result = tool.run(new String[] { "-r", Integer.toString(NUM_REDUCES),
+ "-splitSample", "10", "100",
+ input1, input2, output });
+ assertEquals(0, result);
+
+ Object[] partitions = readPartitions(output);
+ assertArrayEquals(
+ new LongWritable[] { new LongWritable(2L), new LongWritable(7L), new LongWritable(20L),},
+ partitions);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSplitRamdom() throws Exception {
+ Tool tool = new InputSampler<Object,Object>(new Configuration());
+ int result = tool.run(new String[] { "-r", Integer.toString(NUM_REDUCES),
+ // Use 0.999 probability to reduce the flakiness of the test because
+ // the test will fail if the number of samples is less than (number of reduces + 1).
+ "-splitRandom", "0.999f", "20", "100",
+ input1, input2, output });
+ assertEquals(0, result);
+ Object[] partitions = readPartitions(output);
+ // must be 3 split points since NUM_REDUCES = 4:
+ assertEquals(3, partitions.length);
+ // check that the partition array is sorted:
+ Object[] sortedPartitions = Arrays.copyOf(partitions, partitions.length);
+ Arrays.sort(sortedPartitions, new LongWritable.Comparator());
+ assertArrayEquals(sortedPartitions, partitions);
+ }
+
+ @Test
+ public void testSplitInterval() throws Exception {
+ Tool tool = new InputSampler<Object,Object>(new Configuration());
+ int result = tool.run(new String[] { "-r", Integer.toString(NUM_REDUCES),
+ "-splitInterval", "0.5f", "0",
+ input1, input2, output });
+ assertEquals(0, result);
+ Object[] partitions = readPartitions(output);
+ assertArrayEquals(new LongWritable[] { new LongWritable(7L), new LongWritable(9L),
+ new LongWritable(35L),}, partitions);
+ }
+
+ private Object[] readPartitions(String filePath) throws Exception {
+ Configuration conf = new Configuration();
+ TotalOrderPartitioner.setPartitionFile(conf, new Path(filePath));
+ Object[] partitions = readPartitions(FileSystem.getLocal(conf), new Path(filePath),
+ LongWritable.class, conf);
+ return partitions;
+ }
+
+ private Object[] readPartitions(FileSystem fs, Path p, Class<?> keyClass,
+ Configuration conf) throws IOException {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
+ ArrayList<Object> parts = new ArrayList<Object>();
+ Writable key = (Writable)ReflectionUtils.newInstance(keyClass, conf);
+ NullWritable value = NullWritable.get();
+ while (reader.next(key, value)) {
+ parts.add(key);
+ key = (Writable)ReflectionUtils.newInstance(keyClass, conf);
+ }
+ reader.close();
+ return parts.toArray((Object[])Array.newInstance(keyClass, parts.size()));
+ }
+}
Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestTotalOrderPartitioner.java?rev=1536502&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestTotalOrderPartitioner.java (added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TestTotalOrderPartitioner.java Mon Oct 28 20:13:20 2013
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.experimental.categories.Category;
+
+/**
+ * The test is ported from Hadoop branch-0.23 with very small changes.
+ */
+@Category(SmallTests.class)
+public class TestTotalOrderPartitioner extends TestCase {
+
+ private static final Text[] splitStrings = new Text[] {
+ // -inf // 0
+ new Text("aabbb"), // 1
+ new Text("babbb"), // 2
+ new Text("daddd"), // 3
+ new Text("dddee"), // 4
+ new Text("ddhee"), // 5
+ new Text("dingo"), // 6
+ new Text("hijjj"), // 7
+ new Text("n"), // 8
+ new Text("yak"), // 9
+ };
+
+ static class Check<T> {
+ T data;
+ int part;
+ Check(T data, int part) {
+ this.data = data;
+ this.part = part;
+ }
+ }
+
+ private static final ArrayList<Check<Text>> testStrings =
+ new ArrayList<Check<Text>>();
+ static {
+ testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
+ testStrings.add(new Check<Text>(new Text("aaabb"), 0));
+ testStrings.add(new Check<Text>(new Text("aabbb"), 1));
+ testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
+ testStrings.add(new Check<Text>(new Text("babbb"), 2));
+ testStrings.add(new Check<Text>(new Text("baabb"), 1));
+ testStrings.add(new Check<Text>(new Text("yai"), 8));
+ testStrings.add(new Check<Text>(new Text("yak"), 9));
+ testStrings.add(new Check<Text>(new Text("z"), 9));
+ testStrings.add(new Check<Text>(new Text("ddngo"), 5));
+ testStrings.add(new Check<Text>(new Text("hi"), 6));
+ };
+
+ private static <T extends WritableComparable<?>> Path writePartitionFile(
+ String testname, Configuration conf, T[] splits) throws IOException {
+ final FileSystem fs = FileSystem.getLocal(conf);
+ final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
+ ).makeQualified(fs);
+ Path p = new Path(testdir, testname + "/_partition.lst");
+ TotalOrderPartitioner.setPartitionFile(conf, p);
+ conf.setInt("mapreduce.job.reduces", splits.length + 1);
+ SequenceFile.Writer w = null;
+ try {
+ w = SequenceFile.createWriter(fs, conf, p,
+ splits[0].getClass(), NullWritable.class,
+ SequenceFile.CompressionType.NONE);
+ for (int i = 0; i < splits.length; ++i) {
+ w.append(splits[i], NullWritable.get());
+ }
+ } finally {
+ if (null != w)
+ w.close();
+ }
+ return p;
+ }
+
+ public void testTotalOrderMemCmp() throws Exception {
+ TotalOrderPartitioner<Text,NullWritable> partitioner =
+ new TotalOrderPartitioner<Text,NullWritable>();
+
+ // Need to use old JobConf-based variant here:
+ JobConf conf = new JobConf();
+ conf.setMapOutputKeyClass(Text.class);
+ conf.setNumReduceTasks(splitStrings.length + 1);
+
+ Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+ "totalordermemcmp", conf, splitStrings);
+ try {
+ partitioner.setConf(conf);
+ NullWritable nw = NullWritable.get();
+ for (Check<Text> chk : testStrings) {
+ assertEquals(chk.data.toString(), chk.part,
+ partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+ }
+ } finally {
+ p.getFileSystem(conf).delete(p, true);
+ }
+ }
+
+ public void testTotalOrderBinarySearch() throws Exception {
+ TotalOrderPartitioner<Text,NullWritable> partitioner =
+ new TotalOrderPartitioner<Text,NullWritable>();
+ JobConf conf = new JobConf();
+ conf.setMapOutputKeyClass(Text.class);
+ conf.setNumReduceTasks(splitStrings.length + 1);
+
+ Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+ "totalorderbinarysearch", conf, splitStrings);
+ conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
+ try {
+ partitioner.setConf(conf);
+ NullWritable nw = NullWritable.get();
+ for (Check<Text> chk : testStrings) {
+ assertEquals(chk.data.toString(), chk.part,
+ partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+ }
+ } finally {
+ p.getFileSystem(conf).delete(p, true);
+ }
+ }
+
+ public static class ReverseStringComparator implements RawComparator<Text> {
+ @Override
+ public int compare(Text a, Text b) {
+ return -a.compareTo(b);
+ }
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+ int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+ return -1 * WritableComparator.compareBytes(b1, s1+n1, l1-n1,
+ b2, s2+n2, l2-n2);
+ }
+ }
+
+ public void testTotalOrderCustomComparator() throws Exception {
+ TotalOrderPartitioner<Text,NullWritable> partitioner =
+ new TotalOrderPartitioner<Text,NullWritable>();
+
+ final JobConf conf = new JobConf();
+ conf.setMapOutputKeyClass(Text.class);
+ conf.setNumReduceTasks(splitStrings.length + 1);
+
+ Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
+ Arrays.sort(revSplitStrings, new ReverseStringComparator());
+ Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+ "totalordercustomcomparator", conf, revSplitStrings);
+ conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
+ conf.setOutputKeyComparatorClass(ReverseStringComparator.class);
+
+ ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>();
+ revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
+ revCheck.add(new Check<Text>(new Text("aaabb"), 9));
+ revCheck.add(new Check<Text>(new Text("aabbb"), 9));
+ revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
+ revCheck.add(new Check<Text>(new Text("babbb"), 8));
+ revCheck.add(new Check<Text>(new Text("baabb"), 8));
+ revCheck.add(new Check<Text>(new Text("yai"), 1));
+ revCheck.add(new Check<Text>(new Text("yak"), 1));
+ revCheck.add(new Check<Text>(new Text("z"), 0));
+ revCheck.add(new Check<Text>(new Text("ddngo"), 4));
+ revCheck.add(new Check<Text>(new Text("hi"), 3));
+ try {
+ partitioner.setConf(conf);
+ NullWritable nw = NullWritable.get();
+ for (Check<Text> chk : revCheck) {
+ assertEquals(chk.data.toString(), chk.part,
+ partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+ }
+ } finally {
+ p.getFileSystem(conf).delete(p, true);
+ }
+ }
+}