You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ha...@apache.org on 2012/10/09 11:58:21 UTC

svn commit: r1395936 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/h...

Author: harsh
Date: Tue Oct  9 09:58:21 2012
New Revision: 1395936

URL: http://svn.apache.org/viewvc?rev=1395936&view=rev
Log:
MAPREDUCE-4574. Fix TotalOrderParitioner to work with non-WritableComparable key types. Contributed by Harsh J. (harsh)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1395936&r1=1395935&r2=1395936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Oct  9 09:58:21 2012
@@ -138,6 +138,9 @@ Trunk (Unreleased)
     MAPREDUCE-4695. Fix LocalRunner on trunk after MAPREDUCE-3223 broke it
     (harsh)
 
+    MAPREDUCE-4574. Fix TotalOrderParitioner to work with
+    non-WritableComparable key types. (harsh)
+
 Release 2.0.3-alpha - Unreleased 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java?rev=1395936&r1=1395935&r2=1395936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java Tue Oct  9 09:58:21 2012
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.Partitio
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
+public class TotalOrderPartitioner<K ,V>
     extends org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner<K, V>
     implements Partitioner<K,V> {
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java?rev=1395936&r1=1395935&r2=1395936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java Tue Oct  9 09:58:21 2012
@@ -47,7 +47,7 @@ import org.apache.hadoop.util.Reflection
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
+public class TotalOrderPartitioner<K,V>
     extends Partitioner<K,V> implements Configurable {
 
   private Node partitions;
@@ -298,12 +298,13 @@ public class TotalOrderPartitioner<K ext
   @SuppressWarnings("unchecked") // map output key class
   private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
       Configuration conf) throws IOException {
-    SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
+    SequenceFile.Reader reader = new SequenceFile.Reader(
+        conf,
+        SequenceFile.Reader.file(p));
     ArrayList<K> parts = new ArrayList<K>();
     K key = ReflectionUtils.newInstance(keyClass, conf);
-    NullWritable value = NullWritable.get();
     try {
-      while (reader.next(key, value)) {
+      while ((key = (K) reader.next(key)) != null) {
         parts.add(key);
         key = ReflectionUtils.newInstance(keyClass, conf);
       }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java?rev=1395936&r1=1395935&r2=1395936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java Tue Oct  9 09:58:21 2012
@@ -21,19 +21,25 @@ package org.apache.hadoop.mapreduce.lib.
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 
 public class TestTotalOrderPartitioner extends TestCase {
@@ -51,6 +57,19 @@ public class TestTotalOrderPartitioner e
     new Text("yak"),   // 9
   };
 
+  private static final String[] splitJavaStrings = new String[] {
+    // -inf            // 0
+    new String("aabbb"), // 1
+    new String("babbb"), // 2
+    new String("daddd"), // 3
+    new String("dddee"), // 4
+    new String("ddhee"), // 5
+    new String("dingo"), // 6
+    new String("hijjj"), // 7
+    new String("n"),     // 8
+    new String("yak"),   // 9
+  };
+
   static class Check<T> {
     T data;
     int part;
@@ -76,19 +95,41 @@ public class TestTotalOrderPartitioner e
     testStrings.add(new Check<Text>(new Text("hi"), 6));
   };
 
-  private static <T extends WritableComparable<?>> Path writePartitionFile(
+  private static final ArrayList<Check<String>> testJavaStrings =
+      new ArrayList<Check<String>>();
+    static {
+      testJavaStrings.add(new Check<String>(new String("aaaaa"), 0));
+      testJavaStrings.add(new Check<String>(new String("aaabb"), 0));
+      testJavaStrings.add(new Check<String>(new String("aabbb"), 1));
+      testJavaStrings.add(new Check<String>(new String("aaaaa"), 0));
+      testJavaStrings.add(new Check<String>(new String("babbb"), 2));
+      testJavaStrings.add(new Check<String>(new String("baabb"), 1));
+      testJavaStrings.add(new Check<String>(new String("yai"), 8));
+      testJavaStrings.add(new Check<String>(new String("yak"), 9));
+      testJavaStrings.add(new Check<String>(new String("z"), 9));
+      testJavaStrings.add(new Check<String>(new String("ddngo"), 5));
+      testJavaStrings.add(new Check<String>(new String("hi"), 6));
+    };
+
+
+  private static <T> Path writePartitionFile(
       String testname, Configuration conf, T[] splits) throws IOException {
     final FileSystem fs = FileSystem.getLocal(conf);
     final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
-                                 ).makeQualified(fs);
+                                 ).makeQualified(
+                                     fs.getUri(),
+                                     fs.getWorkingDirectory());
     Path p = new Path(testdir, testname + "/_partition.lst");
     TotalOrderPartitioner.setPartitionFile(conf, p);
     conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1);
     SequenceFile.Writer w = null;
     try {
-      w = SequenceFile.createWriter(fs, conf, p,
-          splits[0].getClass(), NullWritable.class,
-          SequenceFile.CompressionType.NONE);
+      w = SequenceFile.createWriter(
+          conf,
+          SequenceFile.Writer.file(p),
+          SequenceFile.Writer.keyClass(splits[0].getClass()),
+          SequenceFile.Writer.valueClass(NullWritable.class),
+          SequenceFile.Writer.compression(CompressionType.NONE));
       for (int i = 0; i < splits.length; ++i) {
         w.append(splits[i], NullWritable.get());
       }
@@ -99,6 +140,31 @@ public class TestTotalOrderPartitioner e
     return p;
   }
 
+  public void testTotalOrderWithCustomSerialization() throws Exception {
+    TotalOrderPartitioner<String, NullWritable> partitioner =
+        new TotalOrderPartitioner<String, NullWritable>();
+    Configuration conf = new Configuration();
+    conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+        JavaSerialization.class.getName(),
+        WritableSerialization.class.getName());
+    conf.setClass(MRJobConfig.KEY_COMPARATOR,
+        JavaSerializationComparator.class,
+        Comparator.class);
+    Path p = TestTotalOrderPartitioner.<String>writePartitionFile(
+        "totalordercustomserialization", conf, splitJavaStrings);
+    conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, String.class, Object.class);
+    try {
+      partitioner.setConf(conf);
+      NullWritable nw = NullWritable.get();
+      for (Check<String> chk : testJavaStrings) {
+        assertEquals(chk.data.toString(), chk.part,
+            partitioner.getPartition(chk.data, nw, splitJavaStrings.length + 1));
+      }
+    } finally {
+      p.getFileSystem(conf).delete(p, true);
+    }
+  }
+
   public void testTotalOrderMemCmp() throws Exception {
     TotalOrderPartitioner<Text,NullWritable> partitioner =
       new TotalOrderPartitioner<Text,NullWritable>();