You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by rs...@apache.org on 2012/10/06 05:25:00 UTC
git commit: CRUNCH-74: Add an initialize(Configuration conf) method
to the Aggregator interface
Updated Branches:
refs/heads/master ed7481d9c -> 1c95647be
CRUNCH-74: Add an initialize(Configuration conf) method to the Aggregator interface
Signed-off-by: Rahul Sharma <rs...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/1c95647b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/1c95647b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/1c95647b
Branch: refs/heads/master
Commit: 1c95647be1abc0bc2927ea1aceec9dfdcf61667d
Parents: ed7481d
Author: Josh Wills <jw...@apache.org>
Authored: Fri Sep 28 18:13:18 2012 -0700
Committer: Rahul Sharma <rs...@apache.org>
Committed: Sat Oct 6 08:53:37 2012 +0530
----------------------------------------------------------------------
.../it/java/org/apache/crunch/CollectionsIT.java | 2 +-
.../src/main/java/org/apache/crunch/CombineFn.java | 69 ++++++++++----
2 files changed, 50 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1c95647b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
index 3f372be..0d5803e 100644
--- a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
+++ b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
@@ -38,7 +38,7 @@ import com.google.common.collect.Lists;
@SuppressWarnings("serial")
public class CollectionsIT {
- public static class AggregateStringListFn implements CombineFn.Aggregator<Collection<String>> {
+ public static class AggregateStringListFn extends CombineFn.SimpleAggregator<Collection<String>> {
private final Collection<String> rtn = Lists.newArrayList();
@Override
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1c95647b/crunch/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java
index 88fbbaf..246827d 100644
--- a/crunch/src/main/java/org/apache/crunch/CombineFn.java
+++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.SortedSet;
import org.apache.crunch.util.Tuples;
+import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
@@ -42,6 +43,12 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
public static interface Aggregator<T> extends Serializable {
/**
+ * Perform any setup of this instance that is required prior to processing
+ * inputs.
+ */
+ void initialize(Configuration configuration);
+
+ /**
* Clears the internal state of this Aggregator and prepares it for the
* values associated with the next key.
*/
@@ -60,6 +67,16 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
/**
+ * Base class for aggregators that do not require any initialization.
+ */
+ public static abstract class SimpleAggregator<T> implements Aggregator<T> {
+ @Override
+ public void initialize(Configuration conf) {
+ // No-op
+ }
+ }
+
+ /**
* Interface for constructing new aggregator instances.
*/
public static interface AggregatorFactory<T> {
@@ -79,6 +96,11 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
@Override
+ public void initialize() {
+ aggregator.initialize(getConfiguration());
+ }
+
+ @Override
public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
aggregator.reset();
for (V v : input.second()) {
@@ -101,6 +123,13 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
@Override
+ public void initialize(Configuration configuration) {
+ for (Aggregator<?> a : aggregators) {
+ a.initialize(configuration);
+ }
+ }
+
+ @Override
public void reset() {
for (Aggregator<?> a : aggregators) {
a.reset();
@@ -379,7 +408,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
return aggregator(new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength));
}
- public static class SumLongs implements Aggregator<Long> {
+ public static class SumLongs extends SimpleAggregator<Long> {
private long sum = 0;
@Override
@@ -404,7 +433,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class SumInts implements Aggregator<Integer> {
+ public static class SumInts extends SimpleAggregator<Integer> {
private int sum = 0;
@Override
@@ -429,7 +458,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class SumFloats implements Aggregator<Float> {
+ public static class SumFloats extends SimpleAggregator<Float> {
private float sum = 0;
@Override
@@ -454,7 +483,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class SumDoubles implements Aggregator<Double> {
+ public static class SumDoubles extends SimpleAggregator<Double> {
private double sum = 0;
@Override
@@ -479,7 +508,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class SumBigInts implements Aggregator<BigInteger> {
+ public static class SumBigInts extends SimpleAggregator<BigInteger> {
private BigInteger sum = BigInteger.ZERO;
@Override
@@ -504,7 +533,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class MaxLongs implements Aggregator<Long> {
+ public static class MaxLongs extends SimpleAggregator<Long> {
private Long max = null;
@Override
@@ -531,7 +560,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class MaxInts implements Aggregator<Integer> {
+ public static class MaxInts extends SimpleAggregator<Integer> {
private Integer max = null;
@Override
@@ -558,7 +587,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class MaxFloats implements Aggregator<Float> {
+ public static class MaxFloats extends SimpleAggregator<Float> {
private Float max = null;
@Override
@@ -585,7 +614,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class MaxDoubles implements Aggregator<Double> {
+ public static class MaxDoubles extends SimpleAggregator<Double> {
private Double max = null;
@Override
@@ -612,7 +641,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class MaxBigInts implements Aggregator<BigInteger> {
+ public static class MaxBigInts extends SimpleAggregator<BigInteger> {
private BigInteger max = null;
@Override
@@ -639,7 +668,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class MinLongs implements Aggregator<Long> {
+ public static class MinLongs extends SimpleAggregator<Long> {
private Long min = null;
@Override
@@ -666,7 +695,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class MinInts implements Aggregator<Integer> {
+ public static class MinInts extends SimpleAggregator<Integer> {
private Integer min = null;
@Override
@@ -693,7 +722,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class MinFloats implements Aggregator<Float> {
+ public static class MinFloats extends SimpleAggregator<Float> {
private Float min = null;
@Override
@@ -720,7 +749,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class MinDoubles implements Aggregator<Double> {
+ public static class MinDoubles extends SimpleAggregator<Double> {
private Double min = null;
@Override
@@ -747,7 +776,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class MinBigInts implements Aggregator<BigInteger> {
+ public static class MinBigInts extends SimpleAggregator<BigInteger> {
private BigInteger min = null;
@Override
@@ -774,7 +803,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
};
- public static class MaxNAggregator<V extends Comparable<V>> implements Aggregator<V> {
+ public static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
private final int arity;
private transient SortedSet<V> elements;
@@ -807,7 +836,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
}
- public static class MinNAggregator<V extends Comparable<V>> implements Aggregator<V> {
+ public static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
private final int arity;
private transient SortedSet<V> elements;
@@ -840,7 +869,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
}
- public static class FirstNAggregator<V> implements Aggregator<V> {
+ public static class FirstNAggregator<V> extends SimpleAggregator<V> {
private final int arity;
private final List<V> elements;
@@ -867,7 +896,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
}
- public static class LastNAggregator<V> implements Aggregator<V> {
+ public static class LastNAggregator<V> extends SimpleAggregator<V> {
private final int arity;
private final LinkedList<V> elements;
@@ -895,7 +924,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
}
}
- public static class StringConcatAggregator implements Aggregator<String> {
+ public static class StringConcatAggregator extends SimpleAggregator<String> {
private final String separator;
private final boolean skipNulls;
private final long maxOutputLength;