You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/12/03 18:14:45 UTC
git commit: CRUNCH-120: Add support for secondary sorts to the
in-memory implementation
Updated Branches:
refs/heads/master 1e06362b9 -> f69aa5d2a
CRUNCH-120: Add support for secondary sorts to the in-memory implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/f69aa5d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/f69aa5d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/f69aa5d2
Branch: refs/heads/master
Commit: f69aa5d2a4d46b04d53566e56e3c64eb7affeb40
Parents: 1e06362
Author: Josh Wills <jw...@apache.org>
Authored: Mon Nov 26 11:01:10 2012 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Dec 3 09:09:25 2012 -0800
----------------------------------------------------------------------
.../java/org/apache/crunch/GroupingOptions.java | 8 +
.../crunch/impl/mem/collect/MemGroupedTable.java | 24 +--
.../apache/crunch/impl/mem/collect/Shuffler.java | 148 +++++++++++++++
.../org/apache/crunch/lib/SecondarySortTest.java | 53 +++++
4 files changed, 212 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f69aa5d2/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
index e58b666..ea2d6c6 100644
--- a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
+++ b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
@@ -51,6 +51,14 @@ public class GroupingOptions {
return sortComparatorClass;
}
+ public Class<? extends RawComparator> getGroupingComparatorClass() {
+ return groupingComparatorClass;
+ }
+
+ public Class<? extends Partitioner> getPartitionerClass() {
+ return partitionerClass;
+ }
+
public void configure(Job job) {
if (partitionerClass != null) {
job.setPartitionerClass(partitionerClass);
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f69aa5d2/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 1f39632..ee27ecc 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -45,33 +45,15 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
private final MemTable<K, V> parent;
- private static <S, T> Map<S, Collection<T>> createMapFor(PType<S> keyType, GroupingOptions options, Pipeline pipeline) {
- if (options != null && options.getSortComparatorClass() != null) {
- RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(), pipeline.getConfiguration());
- return new TreeMap<S, Collection<T>>(rc);
- } else if (keyType != null && Comparable.class.isAssignableFrom(keyType.getTypeClass())) {
- return new TreeMap<S, Collection<T>>();
- }
- return Maps.newHashMap();
- }
-
private static <S, T> Iterable<Pair<S, Iterable<T>>> buildMap(MemTable<S, T> parent, GroupingOptions options) {
PType<S> keyType = parent.getKeyType();
- Map<S, Collection<T>> map = createMapFor(keyType, options, parent.getPipeline());
+ Shuffler<S, T> shuffler = Shuffler.create(keyType, options, parent.getPipeline());
for (Pair<S, T> pair : parent.materialize()) {
- S key = pair.first();
- if (!map.containsKey(key)) {
- map.put(key, Lists.<T> newArrayList());
- }
- map.get(key).add(pair.second());
+ shuffler.add(pair);
}
- List<Pair<S, Iterable<T>>> values = Lists.newArrayList();
- for (Map.Entry<S, Collection<T>> e : map.entrySet()) {
- values.add(Pair.of(e.getKey(), (Iterable<T>) e.getValue()));
- }
- return values;
+ return shuffler;
}
public MemGroupedTable(MemTable<K, V> parent, GroupingOptions options) {
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f69aa5d2/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
new file mode 100644
index 0000000..afc04c3
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem.collect;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * In-memory versions of common MapReduce patterns for aggregating key-value data.
+ */
+abstract class Shuffler<K, V> implements Iterable<Pair<K, Iterable<V>>> {
+
+ public abstract void add(Pair<K, V> record);
+
+ private static <K, V> Map<K, V> getMapForKeyType(PType<?> ptype) {
+ if (ptype != null && Comparable.class.isAssignableFrom(ptype.getTypeClass())) {
+ return new TreeMap<K, V>();
+ } else {
+ return Maps.newHashMap();
+ }
+ }
+
+ public static <S, T> Shuffler<S, T> create(PType<S> keyType, GroupingOptions options,
+ Pipeline pipeline) {
+ Map<S, Collection<T>> map = getMapForKeyType(keyType);
+
+ if (options != null) {
+ if (Pair.class.equals(keyType.getTypeClass()) && options.getGroupingComparatorClass() != null) {
+ PType<?> pairKey = keyType.getSubTypes().get(0);
+ return new SecondarySortShuffler(getMapForKeyType(pairKey));
+ } else if (options.getSortComparatorClass() != null) {
+ RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(),
+ pipeline.getConfiguration());
+ map = new TreeMap<S, Collection<T>>(rc);
+ }
+ }
+
+ return new MapShuffler<S, T>(map);
+ }
+
+ private static class HFunction<K, V> implements Function<Map.Entry<K, Collection<V>>, Pair<K, Iterable<V>>> {
+ @Override
+ public Pair<K, Iterable<V>> apply(Map.Entry<K, Collection<V>> input) {
+ return Pair.of(input.getKey(), (Iterable<V>) input.getValue());
+ }
+ }
+
+ private static class MapShuffler<K, V> extends Shuffler<K, V> {
+ private final Map<K, Collection<V>> map;
+
+ public MapShuffler(Map<K, Collection<V>> map) {
+ this.map = map;
+ }
+
+ @Override
+ public Iterator<Pair<K, Iterable<V>>> iterator() {
+ return Iterators.transform(map.entrySet().iterator(),
+ new HFunction<K, V>());
+ }
+
+ @Override
+ public void add(Pair<K, V> record) {
+ if (!map.containsKey(record.first())) {
+ Collection<V> values = Lists.newArrayList();
+ map.put(record.first(), values);
+ }
+ map.get(record.first()).add(record.second());
+ }
+ }
+
+ private static class SSFunction<K, SK, V> implements
+ Function<Map.Entry<K, List<Pair<SK, V>>>, Pair<Pair<K, SK>, Iterable<V>>> {
+ @Override
+ public Pair<Pair<K, SK>, Iterable<V>> apply(Entry<K, List<Pair<SK, V>>> input) {
+ List<Pair<SK, V>> values = input.getValue();
+ Collections.sort(values, new Comparator<Pair<SK, V>>() {
+ @Override
+ public int compare(Pair<SK, V> o1, Pair<SK, V> o2) {
+ return ((Comparable) o1.first()).compareTo(o2.first());
+ }
+ });
+ Pair<K, SK> key = Pair.of(input.getKey(), values.get(0).first());
+ return Pair.of(key, Iterables.transform(values, new Function<Pair<SK, V>, V>() {
+ @Override
+ public V apply(Pair<SK, V> input) {
+ return input.second();
+ }
+ }));
+ }
+ }
+
+ private static class SecondarySortShuffler<K, SK, V> extends Shuffler<Pair<K, SK>, V> {
+
+ private Map<K, List<Pair<SK, V>>> map;
+
+ public SecondarySortShuffler(Map<K, List<Pair<SK, V>>> map) {
+ this.map = map;
+ }
+
+ @Override
+ public Iterator<Pair<Pair<K, SK>, Iterable<V>>> iterator() {
+ return Iterators.transform(map.entrySet().iterator(), new SSFunction<K, SK, V>());
+ }
+
+ @Override
+ public void add(Pair<Pair<K, SK>, V> record) {
+ K primary = record.first().first();
+ if (!map.containsKey(primary)) {
+ map.put(primary, Lists.<Pair<SK, V>>newArrayList());
+ }
+ map.get(primary).add(Pair.of(record.first().second(), record.second()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f69aa5d2/crunch/src/test/java/org/apache/crunch/lib/SecondarySortTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/SecondarySortTest.java b/crunch/src/test/java/org/apache/crunch/lib/SecondarySortTest.java
new file mode 100644
index 0000000..933b986
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/SecondarySortTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+
+public class SecondarySortTest {
+ @Test
+ public void testInMemory() throws Exception {
+ PTable<Long, Pair<Long, String>> input = MemPipeline.typedTableOf(tableOf(longs(), pairs(longs(), strings())),
+ 1729L, Pair.of(17L, "a"), 100L, Pair.of(29L, "b"), 1729L, Pair.of(29L, "c"));
+ PCollection<String> letters = SecondarySort.sortAndApply(input, new StringifyFn(), strings());
+ assertEquals(ImmutableList.of("b", "ac"), letters.materialize());
+ }
+
+ private static class StringifyFn extends DoFn<Pair<Long, Iterable<Pair<Long, String>>>, String> {
+ @Override
+ public void process(Pair<Long, Iterable<Pair<Long, String>>> input, Emitter<String> emitter) {
+ StringBuilder sb = new StringBuilder();
+ for (Pair<Long, String> p : input.second()) {
+ sb.append(p.second());
+ }
+ emitter.emit(sb.toString());
+ }
+ }
+}