You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/12/03 18:14:45 UTC

git commit: CRUNCH-120: Add support for secondary sorts to the in-memory implementation

Updated Branches:
  refs/heads/master 1e06362b9 -> f69aa5d2a


CRUNCH-120: Add support for secondary sorts to the in-memory implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/f69aa5d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/f69aa5d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/f69aa5d2

Branch: refs/heads/master
Commit: f69aa5d2a4d46b04d53566e56e3c64eb7affeb40
Parents: 1e06362
Author: Josh Wills <jw...@apache.org>
Authored: Mon Nov 26 11:01:10 2012 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Dec 3 09:09:25 2012 -0800

----------------------------------------------------------------------
 .../java/org/apache/crunch/GroupingOptions.java    |    8 +
 .../crunch/impl/mem/collect/MemGroupedTable.java   |   24 +--
 .../apache/crunch/impl/mem/collect/Shuffler.java   |  148 +++++++++++++++
 .../org/apache/crunch/lib/SecondarySortTest.java   |   53 +++++
 4 files changed, 212 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f69aa5d2/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
index e58b666..ea2d6c6 100644
--- a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
+++ b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
@@ -51,6 +51,14 @@ public class GroupingOptions {
     return sortComparatorClass;
   }
 
+  public Class<? extends RawComparator> getGroupingComparatorClass() {
+    return groupingComparatorClass;
+  }
+  
+  public Class<? extends Partitioner> getPartitionerClass() {
+    return partitionerClass;
+  }
+  
   public void configure(Job job) {
     if (partitionerClass != null) {
       job.setPartitionerClass(partitionerClass);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f69aa5d2/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 1f39632..ee27ecc 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -45,33 +45,15 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
 
   private final MemTable<K, V> parent;
 
-  private static <S, T> Map<S, Collection<T>> createMapFor(PType<S> keyType, GroupingOptions options, Pipeline pipeline) {
-    if (options != null && options.getSortComparatorClass() != null) {
-      RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(), pipeline.getConfiguration());
-      return new TreeMap<S, Collection<T>>(rc);
-    } else if (keyType != null && Comparable.class.isAssignableFrom(keyType.getTypeClass())) {
-      return new TreeMap<S, Collection<T>>();
-    }
-    return Maps.newHashMap();
-  }
-
   private static <S, T> Iterable<Pair<S, Iterable<T>>> buildMap(MemTable<S, T> parent, GroupingOptions options) {
     PType<S> keyType = parent.getKeyType();
-    Map<S, Collection<T>> map = createMapFor(keyType, options, parent.getPipeline());
+    Shuffler<S, T> shuffler = Shuffler.create(keyType, options, parent.getPipeline());
 
     for (Pair<S, T> pair : parent.materialize()) {
-      S key = pair.first();
-      if (!map.containsKey(key)) {
-        map.put(key, Lists.<T> newArrayList());
-      }
-      map.get(key).add(pair.second());
+      shuffler.add(pair);
     }
 
-    List<Pair<S, Iterable<T>>> values = Lists.newArrayList();
-    for (Map.Entry<S, Collection<T>> e : map.entrySet()) {
-      values.add(Pair.of(e.getKey(), (Iterable<T>) e.getValue()));
-    }
-    return values;
+    return shuffler;
   }
 
   public MemGroupedTable(MemTable<K, V> parent, GroupingOptions options) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f69aa5d2/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
new file mode 100644
index 0000000..afc04c3
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.collect;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * In-memory versions of common MapReduce patterns for aggregating key-value data.
+ */
+abstract class Shuffler<K, V> implements Iterable<Pair<K, Iterable<V>>> {
+
+  public abstract void add(Pair<K, V> record);
+  
+  private static <K, V> Map<K, V> getMapForKeyType(PType<?> ptype) {
+    if (ptype != null && Comparable.class.isAssignableFrom(ptype.getTypeClass())) {
+      return new TreeMap<K, V>();
+    } else {
+      return Maps.newHashMap();
+    }
+  }
+  
+  public static <S, T> Shuffler<S, T> create(PType<S> keyType, GroupingOptions options,
+      Pipeline pipeline) {
+    Map<S, Collection<T>> map = getMapForKeyType(keyType);
+    
+    if (options != null) {
+      if (Pair.class.equals(keyType.getTypeClass()) && options.getGroupingComparatorClass() != null) {
+        PType<?> pairKey = keyType.getSubTypes().get(0);
+        return new SecondarySortShuffler(getMapForKeyType(pairKey));
+      } else if (options.getSortComparatorClass() != null) {
+        RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(),
+            pipeline.getConfiguration());
+        map = new TreeMap<S, Collection<T>>(rc);
+      }
+    }
+    
+    return new MapShuffler<S, T>(map);
+  }
+  
+  private static class HFunction<K, V> implements Function<Map.Entry<K, Collection<V>>, Pair<K, Iterable<V>>> {
+    @Override
+    public Pair<K, Iterable<V>> apply(Map.Entry<K, Collection<V>> input) {
+      return Pair.of(input.getKey(), (Iterable<V>) input.getValue());
+    }
+  }
+  
+  private static class MapShuffler<K, V> extends Shuffler<K, V> {
+    private final Map<K, Collection<V>> map;
+    
+    public MapShuffler(Map<K, Collection<V>> map) {
+      this.map = map;
+    }
+    
+    @Override
+    public Iterator<Pair<K, Iterable<V>>> iterator() {
+      return Iterators.transform(map.entrySet().iterator(),
+          new HFunction<K, V>());
+    }
+
+    @Override
+    public void add(Pair<K, V> record) {
+      if (!map.containsKey(record.first())) {
+        Collection<V> values = Lists.newArrayList();
+        map.put(record.first(), values);
+      }
+      map.get(record.first()).add(record.second());
+    }
+  }
+
+  private static class SSFunction<K, SK, V> implements
+      Function<Map.Entry<K, List<Pair<SK, V>>>, Pair<Pair<K, SK>, Iterable<V>>> {
+    @Override
+    public Pair<Pair<K, SK>, Iterable<V>> apply(Entry<K, List<Pair<SK, V>>> input) {
+      List<Pair<SK, V>> values = input.getValue();
+      Collections.sort(values, new Comparator<Pair<SK, V>>() {
+        @Override
+        public int compare(Pair<SK, V> o1, Pair<SK, V> o2) {
+          return ((Comparable) o1.first()).compareTo(o2.first());
+        }
+      });
+      Pair<K, SK> key = Pair.of(input.getKey(), values.get(0).first());
+      return Pair.of(key, Iterables.transform(values, new Function<Pair<SK, V>, V>() {
+        @Override
+        public V apply(Pair<SK, V> input) {
+          return input.second();
+        }
+      }));
+    }
+  }
+
+  private static class SecondarySortShuffler<K, SK, V> extends Shuffler<Pair<K, SK>, V> {
+
+    private Map<K, List<Pair<SK, V>>> map;
+    
+    public SecondarySortShuffler(Map<K, List<Pair<SK, V>>> map) {
+      this.map = map;
+    }
+    
+    @Override
+    public Iterator<Pair<Pair<K, SK>, Iterable<V>>> iterator() {
+      return Iterators.transform(map.entrySet().iterator(), new SSFunction<K, SK, V>());
+    }
+
+    @Override
+    public void add(Pair<Pair<K, SK>, V> record) {
+      K primary = record.first().first();
+      if (!map.containsKey(primary)) {
+        map.put(primary, Lists.<Pair<SK, V>>newArrayList());
+      }
+      map.get(primary).add(Pair.of(record.first().second(), record.second()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f69aa5d2/crunch/src/test/java/org/apache/crunch/lib/SecondarySortTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/SecondarySortTest.java b/crunch/src/test/java/org/apache/crunch/lib/SecondarySortTest.java
new file mode 100644
index 0000000..933b986
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/SecondarySortTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+
+public class SecondarySortTest {
+  @Test
+  public void testInMemory() throws Exception {
+    PTable<Long, Pair<Long, String>> input = MemPipeline.typedTableOf(tableOf(longs(), pairs(longs(), strings())),
+        1729L, Pair.of(17L, "a"), 100L, Pair.of(29L, "b"), 1729L, Pair.of(29L, "c"));
+    PCollection<String> letters = SecondarySort.sortAndApply(input, new StringifyFn(), strings());
+    assertEquals(ImmutableList.of("b", "ac"), letters.materialize());
+  }
+  
+  private static class StringifyFn extends DoFn<Pair<Long, Iterable<Pair<Long, String>>>, String> {
+    @Override
+    public void process(Pair<Long, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) {
+      StringBuilder sb = new StringBuilder();
+      for (Pair<Long, String> p : input.second()) {
+        sb.append(p.second());
+      }
+      emitter.emit(sb.toString());
+    }
+  }
+}