You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/03/24 22:43:06 UTC
git commit: CRUNCH-51: Working total sort implementation w/example
instance for testing the impl.
Updated Branches:
refs/heads/master 5d30af0c0 -> e20cbf089
CRUNCH-51: Working total sort implementation w/example instance for testing the impl.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e20cbf08
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e20cbf08
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e20cbf08
Branch: refs/heads/master
Commit: e20cbf089ddf5c7992c7d5f76f050bea8d9eaecc
Parents: 5d30af0
Author: Josh Wills <jw...@apache.org>
Authored: Wed Mar 13 00:45:07 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Mar 24 13:32:36 2013 -0700
----------------------------------------------------------------------
.../org/apache/crunch/examples/SortExample.java | 53 ++
.../java/org/apache/crunch/GroupingOptions.java | 41 ++-
.../crunch/impl/mem/collect/MemCollection.java | 2 +-
.../crunch/impl/mem/collect/MemGroupedTable.java | 2 +-
.../crunch/impl/mr/collect/PGroupedTableImpl.java | 17 +-
.../crunch/io/avro/AvroFileReaderFactory.java | 11 +-
.../apache/crunch/io/seq/SeqFileReaderFactory.java | 13 +-
.../src/main/java/org/apache/crunch/lib/Sort.java | 460 ++++-----------
.../org/apache/crunch/lib/join/MapsideJoin.java | 30 +-
.../org/apache/crunch/lib/sort/Comparators.java | 187 ++++++
.../java/org/apache/crunch/lib/sort/SortFns.java | 210 +++++++
.../crunch/lib/sort/TotalOrderPartitioner.java | 145 +++++
.../org/apache/crunch/types/PGroupedTableType.java | 4 +-
.../main/java/org/apache/crunch/types/PType.java | 4 +-
.../org/apache/crunch/types/avro/AvroType.java | 4 +-
.../crunch/types/writable/WritableTableType.java | 4 +-
.../apache/crunch/types/writable/WritableType.java | 4 +-
.../java/org/apache/crunch/util/DistCache.java | 23 +-
.../org/apache/crunch/util/PartitionUtils.java | 34 ++
19 files changed, 850 insertions(+), 398 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch-examples/src/main/java/org/apache/crunch/examples/SortExample.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/SortExample.java b/crunch-examples/src/main/java/org/apache/crunch/examples/SortExample.java
new file mode 100644
index 0000000..a7cd8b7
--- /dev/null
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/SortExample.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.examples;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.lib.Sort;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.util.CrunchTool;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Simple Crunch tool for running sorting examples from the command line.
+ */
+public class SortExample extends CrunchTool {
+
+ public SortExample() {
+ super();
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 3) {
+ System.err.println("Usage: <input-path> <output-path> <num-reducers>");
+ return 1;
+ }
+
+ PCollection<String> in = readTextFile(args[0]);
+ writeTextFile(Sort.sort(in, Integer.valueOf(args[2]), Order.ASCENDING), args[1]);
+ done();
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new SortExample(), args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
index ea2d6c6..4aa1343 100644
--- a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
+++ b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
@@ -17,10 +17,16 @@
*/
package org.apache.crunch;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
/**
* Options that can be passed to a {@code groupByKey} operation in order to
* exercise finer control over how the partitioning, grouping, and sorting of
@@ -33,14 +39,18 @@ public class GroupingOptions {
private final Class<? extends RawComparator> groupingComparatorClass;
private final Class<? extends RawComparator> sortComparatorClass;
private final int numReducers;
-
+ private final Map<String, String> extraConf;
+ private final Set<SourceTarget<?>> sourceTargets;
+
private GroupingOptions(Class<? extends Partitioner> partitionerClass,
Class<? extends RawComparator> groupingComparatorClass, Class<? extends RawComparator> sortComparatorClass,
- int numReducers) {
+ int numReducers, Map<String, String> extraConf, Set<SourceTarget<?>> sourceTargets) {
this.partitionerClass = partitionerClass;
this.groupingComparatorClass = groupingComparatorClass;
this.sortComparatorClass = sortComparatorClass;
this.numReducers = numReducers;
+ this.extraConf = extraConf;
+ this.sourceTargets = sourceTargets;
}
public int getNumReducers() {
@@ -59,6 +69,10 @@ public class GroupingOptions {
return partitionerClass;
}
+ public Set<SourceTarget<?>> getSourceTargets() {
+ return sourceTargets;
+ }
+
public void configure(Job job) {
if (partitionerClass != null) {
job.setPartitionerClass(partitionerClass);
@@ -72,6 +86,9 @@ public class GroupingOptions {
if (numReducers > 0) {
job.setNumReduceTasks(numReducers);
}
+ for (Map.Entry<String, String> e : extraConf.entrySet()) {
+ job.getConfiguration().set(e.getKey(), e.getValue());
+ }
}
public boolean isCompatibleWith(GroupingOptions other) {
@@ -84,6 +101,9 @@ public class GroupingOptions {
if (sortComparatorClass != other.sortComparatorClass) {
return false;
}
+ if (!extraConf.equals(other.extraConf)) {
+ return false;
+ }
return true;
}
@@ -100,7 +120,9 @@ public class GroupingOptions {
private Class<? extends RawComparator> groupingComparatorClass;
private Class<? extends RawComparator> sortComparatorClass;
private int numReducers;
-
+ private Map<String, String> extraConf = Maps.newHashMap();
+ private Set<SourceTarget<?>> sourceTargets = Sets.newHashSet();
+
public Builder() {
}
@@ -127,8 +149,19 @@ public class GroupingOptions {
return this;
}
+ public Builder conf(String confKey, String confValue) {
+ this.extraConf.put(confKey, confValue);
+ return this;
+ }
+
+ public Builder sourceTarget(SourceTarget<?> st) {
+ this.sourceTargets.add(st);
+ return this;
+ }
+
public GroupingOptions build() {
- return new GroupingOptions(partitionerClass, groupingComparatorClass, sortComparatorClass, numReducers);
+ return new GroupingOptions(partitionerClass, groupingComparatorClass, sortComparatorClass,
+ numReducers, extraConf, sourceTargets);
}
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index da7c798..c97fac6 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -182,7 +182,7 @@ public class MemCollection<S> implements PCollection<S> {
@Override
public long getSize() {
- return collect.size();
+ return collect.isEmpty() ? 0 : 1; // getSize is only used for pipeline optimization in MR
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index ee27ecc..d105bb4 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -88,7 +88,7 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
@Override
public long getSize() {
- return parent.getSize();
+ return 1; // getSize is only used for pipeline optimization in MR
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index 4eb6e9c..ccac5d5 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -18,6 +18,7 @@
package org.apache.crunch.impl.mr.collect;
import java.util.List;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,13 +30,17 @@ import org.apache.crunch.GroupingOptions;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
import org.apache.crunch.fn.Aggregators;
import org.apache.crunch.impl.mr.plan.DoNode;
import org.apache.crunch.types.PGroupedTableType;
import org.apache.crunch.types.PType;
+import org.apache.crunch.util.PartitionUtils;
import org.apache.hadoop.mapreduce.Job;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
@@ -59,8 +64,7 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
public void configureShuffle(Job job) {
ptype.configureShuffle(job, groupingOptions);
if (groupingOptions == null || groupingOptions.getNumReducers() <= 0) {
- long bytesPerTask = job.getConfiguration().getLong("crunch.bytes.per.reduce.task", (1000L * 1000L * 1000L));
- int numReduceTasks = 1 + (int) (getSize() / bytesPerTask);
+ int numReduceTasks = PartitionUtils.getRecommendedPartitions(this, getPipeline().getConfiguration());
if (numReduceTasks > 0) {
job.setNumReduceTasks(numReduceTasks);
LOG.info(String.format("Setting num reduce tasks to %d", numReduceTasks));
@@ -109,6 +113,15 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
}
@Override
+ public Set<SourceTarget<?>> getTargetDependencies() {
+ Set<SourceTarget<?>> td = Sets.newHashSet(super.getTargetDependencies());
+ if (groupingOptions != null) {
+ td.addAll(groupingOptions.getSourceTargets());
+ }
+ return ImmutableSet.copyOf(td);
+ }
+
+ @Override
public List<PCollectionImpl<?>> getParents() {
return ImmutableList.<PCollectionImpl<?>> of(parent);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index 2f8c1e3..c8fe23a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -20,6 +20,7 @@ package org.apache.crunch.io.avro;
import java.io.IOException;
import java.util.Iterator;
+import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
@@ -29,6 +30,7 @@ import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.MapFn;
+import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.io.FileReaderFactory;
import org.apache.crunch.io.impl.AutoClosingIterator;
import org.apache.crunch.types.avro.AvroType;
@@ -39,7 +41,7 @@ import org.apache.hadoop.fs.Path;
import com.google.common.collect.Iterators;
import com.google.common.collect.UnmodifiableIterator;
-class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
+public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
private static final Log LOG = LogFactory.getLog(AvroFileReaderFactory.class);
@@ -47,10 +49,15 @@ class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
private final MapFn<T, T> mapFn;
public AvroFileReaderFactory(AvroType<T> atype) {
- this.recordReader = AvroFileReaderFactory.createDatumReader(atype);
+ this.recordReader = createDatumReader(atype);
this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
}
+ public AvroFileReaderFactory(Schema schema) {
+ this.recordReader = new GenericDatumReader<T>(schema);
+ this.mapFn = IdentityFn.<T>getInstance();
+ }
+
static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) {
if (avroType.hasReflect()) {
if (avroType.hasSpecific()) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
index 2f32746..3f45644 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
@@ -23,21 +23,24 @@ import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.MapFn;
+import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.io.FileReaderFactory;
import org.apache.crunch.io.impl.AutoClosingIterator;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.collect.Iterators;
import com.google.common.collect.UnmodifiableIterator;
-class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
+public class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
private static final Log LOG = LogFactory.getLog(SeqFileReaderFactory.class);
@@ -59,6 +62,14 @@ class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
}
}
+ public SeqFileReaderFactory(Class clazz) {
+ PType<T> ptype = Writables.writables(clazz);
+ this.converter = ptype.getConverter();
+ this.mapFn = ptype.getInputMapFn();
+ this.key = NullWritable.get();
+ this.value = (Writable) ReflectionUtils.newInstance(clazz, null);
+ }
+
@Override
public Iterator<T> read(FileSystem fs, final Path path) {
mapFn.initialize();
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sort.java b/crunch/src/main/java/org/apache/crunch/lib/Sort.java
index cca5a79..23bcaee 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Sort.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Sort.java
@@ -17,55 +17,41 @@
*/
package org.apache.crunch.lib;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
+import static org.apache.crunch.lib.sort.Comparators.*;
+import static org.apache.crunch.lib.sort.SortFns.*;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryData;
-import org.apache.avro.reflect.ReflectData;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.GroupingOptions.Builder;
-import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
import org.apache.crunch.Tuple;
import org.apache.crunch.Tuple3;
import org.apache.crunch.Tuple4;
import org.apache.crunch.TupleN;
+import org.apache.crunch.lib.sort.TotalOrderPartitioner;
+import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.TupleFactory;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.TupleWritable;
import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.hadoop.conf.Configurable;
+import org.apache.crunch.util.PartitionUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
/**
* Utilities for sorting {@code PCollection} instances.
*/
public class Sort {
+ /**
+ * For signaling the order in which a sort should be done.
+ */
public enum Order {
ASCENDING,
DESCENDING,
@@ -79,8 +65,8 @@ public class Sort {
* </code> Column numbering is 1-based.
*/
public static class ColumnOrder {
- int column;
- Order order;
+ private int column;
+ private Order order;
public ColumnOrder(int column, Order order) {
this.column = column;
@@ -91,6 +77,14 @@ public class Sort {
return new ColumnOrder(column, order);
}
+ public int column() {
+ return column;
+ }
+
+ public Order order() {
+ return order;
+ }
+
@Override
public String toString() {
return "ColumnOrder: column:" + column + ", Order: " + order;
@@ -98,211 +92,79 @@ public class Sort {
}
/**
- * Sorts the {@link PCollection} using the natural ordering of its elements.
+ * Sorts the {@code PCollection} using the natural ordering of its elements in ascending order.
*
- * @return a {@link PCollection} representing the sorted collection.
+ * @return a {@code PCollection} representing the sorted collection.
*/
public static <T> PCollection<T> sort(PCollection<T> collection) {
return sort(collection, Order.ASCENDING);
}
/**
- * Sorts the {@link PCollection} using the natural ordering of its elements in
- * the order specified.
+ * Sorts the {@code PCollection} using the natural order of its elements with the given {@code Order}.
*
- * @return a {@link PCollection} representing the sorted collection.
+ * @return a {@code PCollection} representing the sorted collection.
*/
public static <T> PCollection<T> sort(PCollection<T> collection, Order order) {
+ return sort(collection, -1, order);
+ }
+
+ /**
+ * Sorts the {@code PCollection} using the natural ordering of its elements in
+ * the order specified using the given number of reducers.
+ *
+ * @return a {@code PCollection} representing the sorted collection.
+ */
+ public static <T> PCollection<T> sort(PCollection<T> collection, int numReducers, Order order) {
PTypeFamily tf = collection.getTypeFamily();
PTableType<T, Void> type = tf.tableOf(collection.getPType(), tf.nulls());
Configuration conf = collection.getPipeline().getConfiguration();
- GroupingOptions options = buildGroupingOptions(conf, tf, collection.getPType(), order);
PTable<T, Void> pt = collection.parallelDo("sort-pre", new DoFn<T, Pair<T, Void>>() {
@Override
public void process(T input, Emitter<Pair<T, Void>> emitter) {
emitter.emit(Pair.of(input, (Void) null));
}
}, type);
+ GroupingOptions options = buildGroupingOptions(pt, conf, numReducers, order);
return pt.groupByKey(options).ungroup().keys();
}
/**
- * Sorts the {@link PTable} using the natural ordering of its keys.
+ * Sorts the {@code PTable} using the natural ordering of its keys in ascending order.
*
- * @return a {@link PTable} representing the sorted table.
+ * @return a {@code PTable} representing the sorted table.
*/
public static <K, V> PTable<K, V> sort(PTable<K, V> table) {
return sort(table, Order.ASCENDING);
}
/**
- * Sorts the {@link PTable} using the natural ordering of its keys in the
- * order specified.
- *
- * @return a {@link PTable} representing the sorted collection.
+ * Sorts the {@code PTable} using the natural ordering of its keys with the given {@code Order}.
+ *
+ * @return a {@code PTable} representing the sorted table.
*/
public static <K, V> PTable<K, V> sort(PTable<K, V> table, Order key) {
- PTypeFamily tf = table.getTypeFamily();
- Configuration conf = table.getPipeline().getConfiguration();
- GroupingOptions options = buildGroupingOptions(conf, tf, table.getKeyType(), key);
- return table.groupByKey(options).ungroup();
- }
-
- static class SingleKeyFn<V extends Tuple, K> extends MapFn<V, K> {
- private final int index;
-
- public SingleKeyFn(int index) {
- this.index = index;
- }
-
- @Override
- public K map(V input) {
- return (K) input.get(index);
- }
- }
-
- static class TupleKeyFn<V extends Tuple, K extends Tuple> extends MapFn<V, K> {
- private final int[] indices;
- private final TupleFactory tupleFactory;
-
- public TupleKeyFn(int[] indices, TupleFactory tupleFactory) {
- this.indices = indices;
- this.tupleFactory = tupleFactory;
- }
-
- @Override
- public K map(V input) {
- Object[] values = new Object[indices.length];
- for (int i = 0; i < indices.length; i++) {
- values[i] = input.get(indices[i]);
- }
- return (K) tupleFactory.makeTuple(values);
- }
+ return sort(table, -1, key);
}
- static class AvroGenericFn<V extends Tuple> extends MapFn<V, GenericRecord> {
-
- private final int[] indices;
- private final String schemaJson;
- private transient Schema schema;
-
- public AvroGenericFn(int[] indices, Schema schema) {
- this.indices = indices;
- this.schemaJson = schema.toString();
- }
-
- @Override
- public void initialize() {
- this.schema = (new Schema.Parser()).parse(schemaJson);
- }
-
- @Override
- public GenericRecord map(V input) {
- GenericRecord rec = new GenericData.Record(schema);
- for (int i = 0; i < indices.length; i++) {
- rec.put(i, input.get(indices[i]));
- }
- return rec;
- }
- }
-
- static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
- // Guarantee each tuple schema has a globally unique name
- String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
- Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
- List<Schema.Field> fields = Lists.newArrayList();
- AvroType<S> parentAvroType = (AvroType<S>) ptype;
- Schema parentAvroSchema = parentAvroType.getSchema();
-
- for (int index = 0; index < orders.length; index++) {
- ColumnOrder columnOrder = orders[index];
- AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
- Schema fieldSchema = atype.getSchema();
- String fieldName = parentAvroSchema.getFields().get(index).name();
- // Note: avro sorting of strings is inverted relative to how sorting works for WritableComparable
- // Text instances: making this consistent
- Schema.Field.Order order = columnOrder.order == Order.DESCENDING ? Schema.Field.Order.DESCENDING :
- Schema.Field.Order.ASCENDING;
- fields.add(new Schema.Field(fieldName, fieldSchema, "", null, order));
- }
- schema.setFields(fields);
- return schema;
+ /**
+ * Sorts the {@code PTable} using the natural ordering of its keys in the
+ * order specified with a client-specified number of reducers.
+ *
+ * @return a {@code PTable} representing the sorted collection.
+ */
+ public static <K, V> PTable<K, V> sort(PTable<K, V> table, int numReducers, Order key) {
+ Configuration conf = table.getPipeline().getConfiguration();
+ GroupingOptions options = buildGroupingOptions(table, conf, numReducers, key);
+ return table.groupByKey(options).ungroup();
}
- static class KeyExtraction<V extends Tuple> {
-
- private PType<V> ptype;
- private final ColumnOrder[] columnOrder;
- private final int[] cols;
-
- private MapFn<V, Object> byFn;
- private PType<Object> keyPType;
-
- public KeyExtraction(PType<V> ptype, ColumnOrder[] columnOrder) {
- this.ptype = ptype;
- this.columnOrder = columnOrder;
- this.cols = new int[columnOrder.length];
- for (int i = 0; i < columnOrder.length; i++) {
- cols[i] = columnOrder[i].column - 1;
- }
- init();
- }
-
- private void init() {
- List<PType> pt = ptype.getSubTypes();
- PTypeFamily ptf = ptype.getFamily();
- if (cols.length == 1) {
- byFn = new SingleKeyFn(cols[0]);
- keyPType = pt.get(cols[0]);
- } else {
- TupleFactory tf = null;
- switch (cols.length) {
- case 2:
- tf = TupleFactory.PAIR;
- keyPType = ptf.pairs(pt.get(cols[0]), pt.get(cols[1]));
- break;
- case 3:
- tf = TupleFactory.TUPLE3;
- keyPType = ptf.triples(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]));
- break;
- case 4:
- tf = TupleFactory.TUPLE4;
- keyPType = ptf.quads(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]), pt.get(cols[3]));
- break;
- default:
- PType[] pts = new PType[cols.length];
- for (int i = 0; i < pts.length; i++) {
- pts[i] = pt.get(cols[i]);
- }
- tf = TupleFactory.TUPLEN;
- keyPType = (PType<Object>) (PType<?>) ptf.tuples(pts);
- }
-
- if (ptf == AvroTypeFamily.getInstance()) {
- Schema s = createOrderedTupleSchema(keyPType, columnOrder);
- keyPType = (PType<Object>) (PType<?>) Avros.generics(s);
- byFn = new AvroGenericFn(cols, s);
- } else {
- byFn = new TupleKeyFn(cols, tf);
- }
- }
-
- }
-
- public MapFn<V, Object> getByFn() {
- return byFn;
- }
-
- public PType<Object> getKeyType() {
- return keyPType;
- }
- }
/**
- * Sorts the {@link PCollection} of {@link Pair}s using the specified column
+ * Sorts the {@code PCollection} of {@code Pair}s using the specified column
* ordering.
*
- * @return a {@link PCollection} representing the sorted collection.
+ * @return a {@code PCollection} representing the sorted collection.
*/
public static <U, V> PCollection<Pair<U, V>> sortPairs(PCollection<Pair<U, V>> collection,
ColumnOrder... columnOrders) {
@@ -310,10 +172,10 @@ public class Sort {
}
/**
- * Sorts the {@link PCollection} of {@link Tuple3}s using the specified column
+ * Sorts the {@code PCollection} of {@code Tuple3}s using the specified column
* ordering.
*
- * @return a {@link PCollection} representing the sorted collection.
+ * @return a {@code PCollection} representing the sorted collection.
*/
public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(PCollection<Tuple3<V1, V2, V3>> collection,
ColumnOrder... columnOrders) {
@@ -321,10 +183,10 @@ public class Sort {
}
/**
- * Sorts the {@link PCollection} of {@link Tuple4}s using the specified column
+ * Sorts the {@code PCollection} of {@code Tuple4}s using the specified column
* ordering.
*
- * @return a {@link PCollection} representing the sorted collection.
+ * @return a {@code PCollection} representing the sorted collection.
*/
public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(
PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) {
@@ -332,45 +194,59 @@ public class Sort {
}
/**
- * Sorts the {@link PCollection} of {@link TupleN}s using the specified column
- * ordering.
+ * Sorts the {@code PCollection} of tuples using the specified column ordering.
+ *
+ * @return a {@code PCollection} representing the sorted collection.
+ */
+ public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection,
+ ColumnOrder... columnOrders) {
+ return sortTuples(collection, -1, columnOrders);
+ }
+
+ /**
+ * Sorts the {@code PCollection} of {@link TupleN}s using the specified column
+ * ordering and a client-specified number of reducers.
*
- * @return a {@link PCollection} representing the sorted collection.
+ * @return a {@code PCollection} representing the sorted collection.
*/
- public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, ColumnOrder... columnOrders) {
- PTypeFamily tf = collection.getTypeFamily();
+ public static <T extends Tuple> PCollection<T> sortTuples(PCollection<T> collection, int numReducers,
+ ColumnOrder... columnOrders) {
PType<T> pType = collection.getPType();
KeyExtraction<T> ke = new KeyExtraction<T>(pType, columnOrders);
PTable<Object, T> pt = collection.by(ke.getByFn(), ke.getKeyType());
Configuration conf = collection.getPipeline().getConfiguration();
- GroupingOptions options = buildGroupingOptions(conf, tf, ke.getKeyType(), pType, columnOrders);
+ GroupingOptions options = buildGroupingOptions(pt, conf, numReducers, columnOrders);
return pt.groupByKey(options).ungroup().values();
}
// TODO: move to type family?
- private static <T> GroupingOptions buildGroupingOptions(Configuration conf, PTypeFamily tf, PType<T> ptype,
- Order order) {
+ private static <K, V> GroupingOptions buildGroupingOptions(PTable<K, V> ptable, Configuration conf,
+ int numReducers, Order order) {
+ PType<K> ptype = ptable.getKeyType();
+ PTypeFamily tf = ptable.getTypeFamily();
Builder builder = GroupingOptions.builder();
if (order == Order.DESCENDING) {
if (tf == WritableTypeFamily.getInstance()) {
builder.sortComparatorClass(ReverseWritableComparator.class);
} else if (tf == AvroTypeFamily.getInstance()) {
- AvroType<T> avroType = (AvroType<T>) ptype;
+ AvroType<K> avroType = (AvroType<K>) ptype;
Schema schema = avroType.getSchema();
- conf.set("crunch.schema", schema.toString());
+ builder.conf("crunch.schema", schema.toString());
builder.sortComparatorClass(ReverseAvroComparator.class);
} else {
throw new RuntimeException("Unrecognized type family: " + tf);
}
+ } else if (tf == AvroTypeFamily.getInstance()) {
+ builder.conf("crunch.schema", ((AvroType<K>) ptype).getSchema().toString());
}
- // TODO:CRUNCH-23: Intermediate Fix for release 1. More elaborate fix is
- // required check JIRA for details.
- builder.numReducers(1);
+ configureReducers(builder, ptable, conf, numReducers);
return builder.build();
}
- private static <T> GroupingOptions buildGroupingOptions(Configuration conf, PTypeFamily tf, PType<T> keyType,
- PType<?> valueType, ColumnOrder[] columnOrders) {
+ private static <K, V> GroupingOptions buildGroupingOptions(PTable<K, V> ptable, Configuration conf,
+ int numReducers, ColumnOrder[] columnOrders) {
+ PTypeFamily tf = ptable.getTypeFamily();
+ PType<K> keyType = ptable.getKeyType();
Builder builder = GroupingOptions.builder();
if (tf == WritableTypeFamily.getInstance()) {
if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) {
@@ -380,163 +256,39 @@ public class Sort {
builder.sortComparatorClass(TupleWritableComparator.class);
}
} else if (tf == AvroTypeFamily.getInstance()) {
+ AvroType<K> avroType = (AvroType<K>) keyType;
+ Schema schema = avroType.getSchema();
+ builder.conf("crunch.schema", schema.toString());
if (columnOrders.length == 1 && columnOrders[0].order == Order.DESCENDING) {
- AvroType<T> avroType = (AvroType<T>) keyType;
- Schema schema = avroType.getSchema();
- conf.set("crunch.schema", schema.toString());
builder.sortComparatorClass(ReverseAvroComparator.class);
}
} else {
throw new RuntimeException("Unrecognized type family: " + tf);
}
- // TODO:CRUNCH-23: Intermediate Fix for release 1. More elaborate fix is
- // required check JIRA for details.
- builder.numReducers(1);
+ configureReducers(builder, ptable, conf, numReducers);
return builder.build();
}
- static class ReverseWritableComparator<T> extends Configured implements RawComparator<T> {
-
- RawComparator<T> comparator;
-
- @SuppressWarnings("unchecked")
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- if (conf != null) {
- JobConf jobConf = new JobConf(conf);
- comparator = WritableComparator.get(jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+ private static <K, V> void configureReducers(GroupingOptions.Builder builder,
+ PTable<K, V> ptable, Configuration conf, int numReducers) {
+ if (numReducers <= 0) {
+ numReducers = PartitionUtils.getRecommendedPartitions(ptable, conf);
+ if (numReducers < 5) {
+ // Not worth the overhead, force it to 1
+ numReducers = 1;
}
}
-
- @Override
- public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
- return -comparator.compare(arg0, arg1, arg2, arg3, arg4, arg5);
- }
-
- @Override
- public int compare(T o1, T o2) {
- return -comparator.compare(o1, o2);
- }
-
- }
-
- static class ReverseAvroComparator<T> extends Configured implements RawComparator<T> {
-
- private Schema schema;
-
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- if (conf != null) {
- schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
+ builder.numReducers(numReducers);
+ if (numReducers > 1) {
+ Iterable<K> iter = Sample.reservoirSample(ptable.keys(), numReducers - 1).materialize();
+ MaterializableIterable<K> mi = (MaterializableIterable<K>) iter;
+ if (mi.isSourceTarget()) {
+ builder.sourceTarget((SourceTarget) mi.getSource());
}
- }
-
- @Override
- public int compare(T o1, T o2) {
- return -ReflectData.get().compare(o1, o2, schema);
- }
-
- @Override
- public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
- return -BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
- }
-
- }
-
- static class TupleWritableComparator extends WritableComparator implements Configurable {
-
- private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
-
- Configuration conf;
- ColumnOrder[] columnOrders;
-
- public TupleWritableComparator() {
- super(TupleWritable.class, true);
- }
-
- public static void configureOrdering(Configuration conf, Order... orders) {
- conf.set(CRUNCH_ORDERING_PROPERTY,
- Joiner.on(",").join(Iterables.transform(Arrays.asList(orders), new Function<Order, String>() {
- @Override
- public String apply(Order o) {
- return o.name();
- }
- })));
- }
-
- public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
- conf.set(CRUNCH_ORDERING_PROPERTY,
- Joiner.on(",").join(Iterables.transform(Arrays.asList(columnOrders), new Function<ColumnOrder, String>() {
- @Override
- public String apply(ColumnOrder o) {
- return o.column + ";" + o.order.name();
- }
- })));
- }
-
- @Override
- public int compare(WritableComparable a, WritableComparable b) {
- TupleWritable ta = (TupleWritable) a;
- TupleWritable tb = (TupleWritable) b;
- for (int index = 0; index < columnOrders.length; index++) {
- int order = 1;
- if (columnOrders[index].order == Order.ASCENDING) {
- order = 1;
- } else if (columnOrders[index].order == Order.DESCENDING) {
- order = -1;
- } else { // ignore
- continue;
- }
- if (!ta.has(index) && !tb.has(index)) {
- continue;
- } else if (ta.has(index) && !tb.has(index)) {
- return order;
- } else if (!ta.has(index) && tb.has(index)) {
- return -order;
- } else {
- Writable v1 = ta.get(index);
- Writable v2 = tb.get(index);
- if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
- if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
- int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
- if (cmp != 0) {
- return order * cmp;
- }
- } else {
- int cmp = v1.hashCode() - v2.hashCode();
- if (cmp != 0) {
- return order * cmp;
- }
- }
- }
- }
- }
- return 0; // ordering using specified cols found no differences
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- if (conf != null) {
- String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
- String[] columnOrderNames = ordering.split(",");
- columnOrders = new ColumnOrder[columnOrderNames.length];
- for (int i = 0; i < columnOrders.length; i++) {
- String[] split = columnOrderNames[i].split(";");
- int column = Integer.parseInt(split[0]);
- Order order = Order.valueOf(split[1]);
- columnOrders[i] = ColumnOrder.by(column, order);
-
- }
- }
- }
+ builder.partitionerClass(TotalOrderPartitioner.class);
+ builder.conf(TotalOrderPartitioner.PARTITIONER_PATH, mi.getPath().toString());
+ //TODO: distcache handling
+ }
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
index fa28155..56476c1 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
@@ -30,9 +30,8 @@ import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.util.DistCache;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.ArrayListMultimap;
@@ -72,7 +71,8 @@ public class MapsideJoin {
if (iterable instanceof MaterializableIterable) {
MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable;
- MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(), right.getPType());
+ MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(),
+ right.getPType());
ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
if (mi.isSourceTarget()) {
optionsBuilder.sourceTargets((SourceTarget) mi.getSource());
@@ -120,32 +120,24 @@ public class MapsideJoin {
}
private Path getCacheFilePath() {
- Path input = new Path(inputPath);
- try {
- for (Path localPath : DistributedCache.getLocalCacheFiles(getConfiguration())) {
- if (localPath.toString().endsWith(input.getName())) {
- return localPath.makeQualified(FileSystem.getLocal(getConfiguration()));
-
- }
- }
- } catch (IOException e) {
- throw new CrunchRuntimeException(e);
+ Path local = DistCache.getPathToCacheFile(new Path(inputPath), getConfiguration());
+ if (local == null) {
+ throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
}
-
- throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
+ return local;
}
@Override
public void configure(Configuration conf) {
- DistributedCache.addCacheFile(new Path(inputPath).toUri(), conf);
+ DistCache.addCacheFile(new Path(inputPath), conf);
}
@Override
public void initialize() {
super.initialize();
- ReadableSourceTarget<Pair<K, V>> sourceTarget = (ReadableSourceTarget<Pair<K, V>>) ptype
- .getDefaultFileSource(getCacheFilePath());
+ ReadableSourceTarget<Pair<K, V>> sourceTarget = ptype.getDefaultFileSource(
+ getCacheFilePath());
Iterable<Pair<K, V>> iterable = null;
try {
iterable = sourceTarget.read(getConfiguration());
@@ -168,7 +160,5 @@ public class MapsideJoin {
emitter.emit(Pair.of(key, valuePair));
}
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java b/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java
new file mode 100644
index 0000000..ae7f49a
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/sort/Comparators.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.sort;
+
+import java.util.Arrays;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+/**
+ * A collection of {@code RawComparator<T>} implementations that are used by Crunch's {@code Sort} library.
+ */
+public class Comparators {
+
+ public static class ReverseWritableComparator<T> extends Configured implements RawComparator<T> {
+
+ private RawComparator<T> comparator;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf != null) {
+ JobConf jobConf = new JobConf(conf);
+ comparator = WritableComparator.get(jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+ }
+ }
+
+ @Override
+ public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
+ return -comparator.compare(arg0, arg1, arg2, arg3, arg4, arg5);
+ }
+
+ @Override
+ public int compare(T o1, T o2) {
+ return -comparator.compare(o1, o2);
+ }
+ }
+
+ public static class ReverseAvroComparator<T> extends Configured implements RawComparator<AvroKey<T>> {
+
+ private Schema schema;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf != null) {
+ schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
+ }
+ }
+
+ @Override
+ public int compare(AvroKey<T> o1, AvroKey<T> o2) {
+ return -ReflectData.get().compare(o1.datum(), o2.datum(), schema);
+ }
+
+ @Override
+ public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
+ return -BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
+ }
+ }
+
+ public static class TupleWritableComparator extends WritableComparator implements Configurable {
+
+ private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
+
+ private Configuration conf;
+ private ColumnOrder[] columnOrders;
+
+ public TupleWritableComparator() {
+ super(TupleWritable.class, true);
+ }
+
+ public static void configureOrdering(Configuration conf, Order... orders) {
+ conf.set(CRUNCH_ORDERING_PROPERTY,
+ Joiner.on(",").join(Iterables.transform(Arrays.asList(orders), new Function<Order, String>() {
+ @Override
+ public String apply(Order o) {
+ return o.name();
+ }
+ })));
+ }
+
+ public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
+ conf.set(CRUNCH_ORDERING_PROPERTY,
+ Joiner.on(",").join(Iterables.transform(Arrays.asList(columnOrders), new Function<ColumnOrder, String>() {
+ @Override
+ public String apply(ColumnOrder o) {
+ return o.column() + ";" + o.order().name();
+ }
+ })));
+ }
+
+ @Override
+ public int compare(WritableComparable a, WritableComparable b) {
+ TupleWritable ta = (TupleWritable) a;
+ TupleWritable tb = (TupleWritable) b;
+ for (int index = 0; index < columnOrders.length; index++) {
+ int order = 1;
+ if (columnOrders[index].order() == Order.ASCENDING) {
+ order = 1;
+ } else if (columnOrders[index].order() == Order.DESCENDING) {
+ order = -1;
+ } else { // ignore
+ continue;
+ }
+ if (!ta.has(index) && !tb.has(index)) {
+ continue;
+ } else if (ta.has(index) && !tb.has(index)) {
+ return order;
+ } else if (!ta.has(index) && tb.has(index)) {
+ return -order;
+ } else {
+ Writable v1 = ta.get(index);
+ Writable v2 = tb.get(index);
+ if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
+ if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
+ int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
+ if (cmp != 0) {
+ return order * cmp;
+ }
+ } else {
+ int cmp = v1.hashCode() - v2.hashCode();
+ if (cmp != 0) {
+ return order * cmp;
+ }
+ }
+ }
+ }
+ }
+ return 0; // ordering using specified cols found no differences
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ if (conf != null) {
+ String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
+ String[] columnOrderNames = ordering.split(",");
+ columnOrders = new ColumnOrder[columnOrderNames.length];
+ for (int i = 0; i < columnOrders.length; i++) {
+ String[] split = columnOrderNames[i].split(";");
+ int column = Integer.parseInt(split[0]);
+ Order order = Order.valueOf(split[1]);
+ columnOrders[i] = ColumnOrder.by(column, order);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java b/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java
new file mode 100644
index 0000000..be218f6
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/sort/SortFns.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.sort;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.TupleFactory;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+
+import com.google.common.collect.Lists;
+
+/**
+ * A set of {@code DoFn}s that are used by Crunch's {@code Sort} library.
+ */
+public class SortFns {
+
+ /**
+ * Extracts a single indexed key from a {@code Tuple} instance.
+ */
+ public static class SingleKeyFn<V extends Tuple, K> extends MapFn<V, K> {
+ private final int index;
+
+ public SingleKeyFn(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public K map(V input) {
+ return (K) input.get(index);
+ }
+ }
+
+ /**
+ * Extracts a composite key from a {@code Tuple} instance.
+ */
+ public static class TupleKeyFn<V extends Tuple, K extends Tuple> extends MapFn<V, K> {
+ private final int[] indices;
+ private final TupleFactory tupleFactory;
+
+ public TupleKeyFn(int[] indices, TupleFactory tupleFactory) {
+ this.indices = indices;
+ this.tupleFactory = tupleFactory;
+ }
+
+ @Override
+ public K map(V input) {
+ Object[] values = new Object[indices.length];
+ for (int i = 0; i < indices.length; i++) {
+ values[i] = input.get(indices[i]);
+ }
+ return (K) tupleFactory.makeTuple(values);
+ }
+ }
+
+ /**
+ * Pulls a composite set of keys from an Avro {@code GenericRecord} instance.
+ */
+ public static class AvroGenericFn<V extends Tuple> extends MapFn<V, GenericRecord> {
+
+ private final int[] indices;
+ private final String schemaJson;
+ private transient Schema schema;
+
+ public AvroGenericFn(int[] indices, Schema schema) {
+ this.indices = indices;
+ this.schemaJson = schema.toString();
+ }
+
+ @Override
+ public void initialize() {
+ this.schema = (new Schema.Parser()).parse(schemaJson);
+ }
+
+ @Override
+ public GenericRecord map(V input) {
+ GenericRecord rec = new GenericData.Record(schema);
+ for (int i = 0; i < indices.length; i++) {
+ rec.put(i, input.get(indices[i]));
+ }
+ return rec;
+ }
+ }
+
+ /**
+ * Constructs an Avro schema for the given {@code PType<S>} that respects the given column
+ * orderings.
+ */
+ public static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
+ // Guarantee each tuple schema has a globally unique name
+ String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
+ Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+ List<Schema.Field> fields = Lists.newArrayList();
+ AvroType<S> parentAvroType = (AvroType<S>) ptype;
+ Schema parentAvroSchema = parentAvroType.getSchema();
+
+ for (int index = 0; index < orders.length; index++) {
+ ColumnOrder columnOrder = orders[index];
+ AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
+ Schema fieldSchema = atype.getSchema();
+ String fieldName = parentAvroSchema.getFields().get(index).name();
+ // Note: avro sorting of strings is inverted relative to how sorting works for WritableComparable
+ // Text instances: making this consistent
+ Schema.Field.Order order = columnOrder.order() == Order.DESCENDING ? Schema.Field.Order.DESCENDING :
+ Schema.Field.Order.ASCENDING;
+ fields.add(new Schema.Field(fieldName, fieldSchema, "", null, order));
+ }
+ schema.setFields(fields);
+ return schema;
+ }
+
+ /**
+ * Utility class for encapsulating key extraction logic and serialization information about
+ * key extraction.
+ */
+ public static class KeyExtraction<V extends Tuple> {
+
+ private PType<V> ptype;
+ private final ColumnOrder[] columnOrder;
+ private final int[] cols;
+
+ private MapFn<V, Object> byFn;
+ private PType<Object> keyPType;
+
+ public KeyExtraction(PType<V> ptype, ColumnOrder[] columnOrder) {
+ this.ptype = ptype;
+ this.columnOrder = columnOrder;
+ this.cols = new int[columnOrder.length];
+ for (int i = 0; i < columnOrder.length; i++) {
+ cols[i] = columnOrder[i].column() - 1;
+ }
+ init();
+ }
+
+ private void init() {
+ List<PType> pt = ptype.getSubTypes();
+ PTypeFamily ptf = ptype.getFamily();
+ if (cols.length == 1) {
+ byFn = new SingleKeyFn(cols[0]);
+ keyPType = pt.get(cols[0]);
+ } else {
+ TupleFactory tf = null;
+ switch (cols.length) {
+ case 2:
+ tf = TupleFactory.PAIR;
+ keyPType = ptf.pairs(pt.get(cols[0]), pt.get(cols[1]));
+ break;
+ case 3:
+ tf = TupleFactory.TUPLE3;
+ keyPType = ptf.triples(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]));
+ break;
+ case 4:
+ tf = TupleFactory.TUPLE4;
+ keyPType = ptf.quads(pt.get(cols[0]), pt.get(cols[1]), pt.get(cols[2]), pt.get(cols[3]));
+ break;
+ default:
+ PType[] pts = new PType[cols.length];
+ for (int i = 0; i < pts.length; i++) {
+ pts[i] = pt.get(cols[i]);
+ }
+ tf = TupleFactory.TUPLEN;
+ keyPType = (PType<Object>) (PType<?>) ptf.tuples(pts);
+ }
+
+ if (ptf == AvroTypeFamily.getInstance()) {
+ Schema s = createOrderedTupleSchema(keyPType, columnOrder);
+ keyPType = (PType<Object>) (PType<?>) Avros.generics(s);
+ byFn = new AvroGenericFn(cols, s);
+ } else {
+ byFn = new TupleKeyFn(cols, tf);
+ }
+ }
+
+ }
+
+ public MapFn<V, Object> getByFn() {
+ return byFn;
+ }
+
+ public PType<Object> getKeyType() {
+ return keyPType;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java b/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
new file mode 100644
index 0000000..94fbdbe
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/sort/TotalOrderPartitioner.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.sort;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.avro.AvroFileReaderFactory;
+import org.apache.crunch.io.seq.SeqFileReaderFactory;
+import org.apache.crunch.types.writable.WritableDeepCopier;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * A partition-aware {@code Partitioner} instance that can work with either Avro or Writable-formatted
+ * keys.
+ */
+public class TotalOrderPartitioner<K, V> extends Partitioner<K, V> implements Configurable {
+
+ public static final String DEFAULT_PATH = "_partition.lst";
+ public static final String PARTITIONER_PATH =
+ "crunch.totalorderpartitioner.path";
+
+ private Configuration conf;
+ private Node<K> partitions;
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ try {
+ this.conf = conf;
+ String parts = getPartitionFile(conf);
+ final Path partFile = new Path(parts);
+ final FileSystem fs = (DEFAULT_PATH.equals(parts))
+ ? FileSystem.getLocal(conf) // assume in DistributedCache
+ : partFile.getFileSystem(conf);
+
+ Job job = new Job(conf);
+ Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+ RawComparator<K> comparator =
+ (RawComparator<K>) job.getSortComparator();
+ K[] splitPoints = readPartitions(fs, partFile, keyClass, conf, comparator);
+ int numReduceTasks = job.getNumReduceTasks();
+ if (splitPoints.length != numReduceTasks - 1) {
+ throw new IOException("Wrong number of partitions in keyset");
+ }
+ partitions = new BinarySearchNode(splitPoints, comparator);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Can't read partitions file", e);
+ }
+ }
+
+ @Override
+ public int getPartition(K key, V value, int modulo) {
+ return partitions.findPartition(key);
+ }
+
+ public static void setPartitionFile(Configuration conf, Path p) {
+ conf.set(PARTITIONER_PATH, p.toString());
+ }
+
+ public static String getPartitionFile(Configuration conf) {
+ return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
+ }
+
+ @SuppressWarnings("unchecked") // map output key class
+ private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
+ Configuration conf, final RawComparator<K> comparator) throws IOException {
+ ArrayList<K> parts = new ArrayList<K>();
+ String schema = conf.get("crunch.schema");
+ if (schema != null) {
+ Schema s = (new Schema.Parser()).parse(schema);
+ AvroFileReaderFactory<K> a = new AvroFileReaderFactory<K>(s);
+ Iterator<K> iter = CompositePathIterable.create(fs, p, a).iterator();
+ while (iter.hasNext()) {
+ parts.add((K) new AvroKey<K>(iter.next()));
+ }
+ } else {
+ WritableDeepCopier wdc = new WritableDeepCopier(keyClass);
+ SeqFileReaderFactory<K> s = new SeqFileReaderFactory<K>(keyClass);
+ Iterator<K> iter = CompositePathIterable.create(fs, p, s).iterator();
+ while (iter.hasNext()) {
+ parts.add((K) wdc.deepCopy((Writable) iter.next()));
+ }
+ }
+ Collections.sort(parts, comparator);
+ return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
+ }
+
+ /**
+ * Interface to the partitioner to locate a key in the partition keyset.
+ */
+ interface Node<T> {
+ /**
+ * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
+ * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
+ */
+ int findPartition(T key);
+ }
+
+ class BinarySearchNode implements Node<K> {
+ private final K[] splitPoints;
+ private final RawComparator<K> comparator;
+ BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
+ this.splitPoints = splitPoints;
+ this.comparator = comparator;
+ }
+ public int findPartition(K key) {
+ final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
+ return (pos < 0) ? -pos : pos;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
index e9c773c..d276cd6 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
@@ -24,7 +24,7 @@ import org.apache.crunch.GroupingOptions;
import org.apache.crunch.MapFn;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.Pair;
-import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -135,7 +135,7 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<
public abstract void configureShuffle(Job job, GroupingOptions options);
@Override
- public SourceTarget<Pair<K, Iterable<V>>> getDefaultFileSource(Path path) {
+ public ReadableSourceTarget<Pair<K, Iterable<V>>> getDefaultFileSource(Path path) {
throw new UnsupportedOperationException("Grouped tables cannot be written out directly");
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PType.java b/crunch/src/main/java/org/apache/crunch/types/PType.java
index 565615a..ebddf84 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PType.java
@@ -23,7 +23,7 @@ import java.util.List;
import org.apache.crunch.DoFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
-import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -77,7 +77,7 @@ public interface PType<T> extends Serializable {
* Returns a {@code SourceTarget} that is able to read/write data using the serialization format
* specified by this {@code PType}.
*/
- SourceTarget<T> getDefaultFileSource(Path path);
+ ReadableSourceTarget<T> getDefaultFileSource(Path path);
/**
* Returns the sub-types that make up this PType if it is a composite instance, such as a tuple.
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
index a0e2722..a92b0d0 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -24,8 +24,8 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.crunch.MapFn;
-import org.apache.crunch.SourceTarget;
import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.crunch.io.avro.AvroFileSourceTarget;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.DeepCopier;
@@ -161,7 +161,7 @@ public class AvroType<T> implements PType<T> {
}
@Override
- public SourceTarget<T> getDefaultFileSource(Path path) {
+ public ReadableSourceTarget<T> getDefaultFileSource(Path path) {
return new AvroFileSourceTarget<T>(path, this);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
index 2f75b94..93e0fd6 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
@@ -22,8 +22,8 @@ import java.util.List;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.crunch.MapFn;
import org.apache.crunch.Pair;
-import org.apache.crunch.SourceTarget;
import org.apache.crunch.fn.PairMapFn;
+import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
import org.apache.crunch.lib.PTables;
import org.apache.crunch.types.Converter;
@@ -98,7 +98,7 @@ class WritableTableType<K, V> implements PTableType<K, V> {
}
@Override
- public SourceTarget<Pair<K, V>> getDefaultFileSource(Path path) {
+ public ReadableSourceTarget<Pair<K, V>> getDefaultFileSource(Path path) {
return new SeqFileTableSourceTarget<K, V>(path, this);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
index fd64b3a..734946c 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
@@ -21,7 +21,7 @@ import java.util.List;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.crunch.MapFn;
-import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.crunch.io.seq.SeqFileSourceTarget;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.DeepCopier;
@@ -90,7 +90,7 @@ public class WritableType<T, W extends Writable> implements PType<T> {
}
@Override
- public SourceTarget<T> getDefaultFileSource(Path path) {
+ public ReadableSourceTarget<T> getDefaultFileSource(Path path) {
return new SeqFileSourceTarget<T>(path, this);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/util/DistCache.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/DistCache.java b/crunch/src/main/java/org/apache/crunch/util/DistCache.java
index 891cc6c..3e49930 100644
--- a/crunch/src/main/java/org/apache/crunch/util/DistCache.java
+++ b/crunch/src/main/java/org/apache/crunch/util/DistCache.java
@@ -80,6 +80,23 @@ public class DistCache {
return value;
}
+ public static void addCacheFile(Path path, Configuration conf) {
+ DistributedCache.addCacheFile(path.toUri(), conf);
+ }
+
+ public static Path getPathToCacheFile(Path path, Configuration conf) {
+ try {
+ for (Path localPath : DistributedCache.getLocalCacheFiles(conf)) {
+ if (localPath.toString().endsWith(path.getName())) {
+ return localPath.makeQualified(FileSystem.getLocal(conf));
+ }
+ }
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ return null;
+ }
+
/**
* Adds the specified jar to the distributed cache of jobs using the provided
* configuration. The jar will be placed on the classpath of tasks run by the
@@ -143,11 +160,11 @@ public class DistCache {
* @throws IOException
* If there is a problem searching for the jar file.
*/
- public static String findContainingJar(Class jarClass) throws IOException {
+ public static String findContainingJar(Class<?> jarClass) throws IOException {
ClassLoader loader = jarClass.getClassLoader();
String classFile = jarClass.getName().replaceAll("\\.", "/") + ".class";
- for (Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) {
- URL url = (URL) itr.nextElement();
+ for (Enumeration<URL> itr = loader.getResources(classFile); itr.hasMoreElements();) {
+ URL url = itr.nextElement();
if ("jar".equals(url.getProtocol())) {
String toReturn = url.getPath();
if (toReturn.startsWith("file:")) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/e20cbf08/crunch/src/main/java/org/apache/crunch/util/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/PartitionUtils.java b/crunch/src/main/java/org/apache/crunch/util/PartitionUtils.java
new file mode 100644
index 0000000..da8db6b
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/util/PartitionUtils.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import org.apache.crunch.PCollection;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ */
+public class PartitionUtils {
+ public static final String BYTES_PER_REDUCE_TASK = "crunch.bytes.per.reduce.task";
+ public static final long DEFAULT_BYTES_PER_REDUCE_TASK = 1000L * 1000L * 1000L;
+
+ public static <T> int getRecommendedPartitions(PCollection<T> pcollection, Configuration conf) {
+ long bytesPerTask = conf.getLong(BYTES_PER_REDUCE_TASK, DEFAULT_BYTES_PER_REDUCE_TASK);
+ return 1 + (int) (pcollection.getSize() / bytesPerTask);
+ }
+}