You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/10/21 14:51:06 UTC

[2/3] git commit: Secondary sort integration tests

Secondary sort integration tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/db0ce8e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/db0ce8e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/db0ce8e8

Branch: refs/heads/master
Commit: db0ce8e8985c3d0d686898aa4182bdf1c004d7b5
Parents: 521cea2
Author: Josh Wills <jw...@apache.org>
Authored: Sun Oct 14 11:40:28 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Oct 21 05:07:55 2012 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/lib/SecondarySortIT.java     |   65 +++++++++
 crunch/src/it/resources/secondary_sort_input.txt   |    7 +
 .../java/org/apache/crunch/lib/SecondarySort.java  |  101 ++++-----------
 3 files changed, 98 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/db0ce8e8/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
new file mode 100644
index 0000000..242f621
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
+
+public class SecondarySortIT extends CrunchTestSupport implements Serializable {
+
+  @Test
+  public void testSecondarySort() throws Exception {
+    Pipeline p = new MRPipeline(SecondarySortIT.class, tempDir.getDefaultConfiguration());
+    String inputFile = tempDir.copyResourceFileName("secondary_sort_input.txt");
+    
+    PTable<String, Pair<Integer, Integer>> in = p.read(From.textFile(inputFile))
+        .parallelDo(new MapFn<String, Pair<String, Pair<Integer, Integer>>>() {
+          @Override
+          public Pair<String, Pair<Integer, Integer>> map(String input) {
+            String[] pieces = input.split(",");
+            return Pair.of(pieces[0],
+                Pair.of(Integer.valueOf(pieces[1].trim()), Integer.valueOf(pieces[2].trim())));
+          }
+        }, tableOf(strings(), pairs(ints(), ints())));
+    Iterable<String> lines = SecondarySort.sortAndApply(in, new MapFn<Pair<String, Iterable<Pair<Integer, Integer>>>, String>() {
+      @Override
+      public String map(Pair<String, Iterable<Pair<Integer, Integer>>> input) {
+        Joiner j = Joiner.on(',');
+        return j.join(input.first(), j.join(input.second()));
+      }
+    }, strings()).materialize();
+    assertEquals(ImmutableList.of("one,[-5,10],[1,1],[2,-3]", "three,[0,-1]", "two,[1,7],[2,6],[4,5]"),
+        ImmutableList.copyOf(lines));
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/db0ce8e8/crunch/src/it/resources/secondary_sort_input.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/secondary_sort_input.txt b/crunch/src/it/resources/secondary_sort_input.txt
new file mode 100644
index 0000000..3c7be93
--- /dev/null
+++ b/crunch/src/it/resources/secondary_sort_input.txt
@@ -0,0 +1,7 @@
+one,1,1 
+one,2,-3 
+two,4,5 
+two,2,6 
+two,1,7,9 
+three,0,-1 
+one,-5,10 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/db0ce8e8/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
index 5a826fd..ebf7fb4 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
@@ -24,6 +24,7 @@ import org.apache.crunch.Emitter;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.lib.join.JoinUtils;
@@ -32,40 +33,33 @@ import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.hadoop.conf.Configuration;
 
-import com.google.common.collect.ImmutableList;
-
 /**
- * Utilities for performing a secondary sort on a PTable<K, Pair<V1, V2>> instance, i.e., sort on the
- * key and then sort the values by V1.
+ * Utilities for performing a secondary sort on a {@code PTable<K, Pair<V1, V2>>} collection.
  */
 public class SecondarySort {
-
-  public static <K, V1, V2> PTable<K, Collection<Pair<V1, V2>>> sort(PTable<K, Pair<V1, V2>> input) {
-    PTypeFamily ptf = input.getTypeFamily();
-    return sortAndApply(input, new SSUnpackFn<K, V1, V2>(),
-        ptf.tableOf(input.getKeyType(), ptf.collections(input.getValueType())));
-  }
   
+  /**
+   * Perform a secondary sort on the given {@code PTable} instance and then apply a
+   * {@code DoFn} to the resulting sorted data to yield an output {@code PCollection<T>}.
+   */
   public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input,
       DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype) {
-    PTypeFamily ptf = input.getTypeFamily();
-    PType<Pair<V1, V2>> valueType = input.getValueType();
-    PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf(
-        ptf.pairs(input.getKeyType(), valueType.getSubTypes().get(0)),
-        valueType);
-    PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(),
-        ptf.collections(input.getValueType()));
-    return input.parallelDo("SecondarySort.format", new SSFormatFn<K, V1, V2>(), inter)
-        .groupByKey(
-            GroupingOptions.builder()
-            .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf))
-            .partitionerClass(JoinUtils.getPartitionerClass(ptf))
-            .build())
+    return prepare(input)
         .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, T>(doFn), ptype);
   }
   
+  /**
+   * Perform a secondary sort on the given {@code PTable} instance and then apply a
+   * {@code DoFn} to the resulting sorted data to yield an output {@code PTable<U, V>}.
+   */
   public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input,
       DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype) {
+    return prepare(input)
+        .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, Pair<U, V>>(doFn), ptype);
+  }
+  
+  private static <K, V1, V2> PGroupedTable<Pair<K, V1>, Pair<V1, V2>> prepare(
+      PTable<K, Pair<V1, V2>> input) {
     PTypeFamily ptf = input.getTypeFamily();
     PType<Pair<V1, V2>> valueType = input.getValueType();
     PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf(
@@ -78,10 +72,16 @@ public class SecondarySort {
             GroupingOptions.builder()
             .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf))
             .partitionerClass(JoinUtils.getPartitionerClass(ptf))
-            .build())
-        .parallelDo("SecondarySort.apply", new SSTableWrapFn<K, V1, V2, U, V>(doFn), ptype);
+            .build());
   }
   
+  private static class SSFormatFn<K, V1, V2> extends MapFn<Pair<K, Pair<V1, V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> {
+    @Override
+    public Pair<Pair<K, V1>, Pair<V1, V2>> map(Pair<K, Pair<V1, V2>> input) {
+      return Pair.of(Pair.of(input.first(), input.second().first()), input.second());
+    }
+  }  
+
   private static class SSWrapFn<K, V1, V2, T> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, T> {
     private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern;
     
@@ -113,54 +113,5 @@ public class SecondarySort {
     public void cleanup(Emitter<T> emitter) {
       intern.cleanup(emitter);
     }
-  }
-  
-  private static class SSTableWrapFn<K, V1, V2, U, V> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, Pair<U, V>> {
-    private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> intern;
-    
-    public SSTableWrapFn(DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> intern) {
-      this.intern = intern;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      intern.configure(conf);
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      intern.setConfigurationForTest(conf);
-    }
-
-    @Override
-    public void initialize() {
-      intern.setContext(getContext());
-    }
-    
-    @Override
-    public void process(Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>> input, Emitter<Pair<U, V>> emitter) {
-      intern.process(Pair.of(input.first().first(), input.second()), emitter);
-    }
-    
-    @Override
-    public void cleanup(Emitter<Pair<U, V>> emitter) {
-      intern.cleanup(emitter);
-    }
-  }
-  
-  private static class SSFormatFn<K, V1, V2> extends MapFn<Pair<K, Pair<V1, V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> {
-    @Override
-    public Pair<Pair<K, V1>, Pair<V1, V2>> map(Pair<K, Pair<V1, V2>> input) {
-      return Pair.of(Pair.of(input.first(), input.second().first()), input.second());
-    }
-  }
-  
-  private static class SSUnpackFn<K, V1, V2> extends
-      MapFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<K, Collection<Pair<V1, V2>>>> {
-    @Override
-    public Pair<K, Collection<Pair<V1, V2>>> map(Pair<K, Iterable<Pair<V1, V2>>> input) {
-      Collection<Pair<V1, V2>> c = ImmutableList.copyOf(input.second());
-      return Pair.of(input.first(), c);
-    }
-  }
+  }  
 }