You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by rs...@apache.org on 2012/10/06 05:25:00 UTC

git commit: CRUNCH-74: Add an initialize(Configuration conf) method to the Aggregator interface

Updated Branches:
  refs/heads/master ed7481d9c -> 1c95647be


CRUNCH-74: Add an initialize(Configuration conf) method to the Aggregator interface

Signed-off-by: Rahul Sharma <rs...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/1c95647b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/1c95647b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/1c95647b

Branch: refs/heads/master
Commit: 1c95647be1abc0bc2927ea1aceec9dfdcf61667d
Parents: ed7481d
Author: Josh Wills <jw...@apache.org>
Authored: Fri Sep 28 18:13:18 2012 -0700
Committer: Rahul Sharma <rs...@apache.org>
Committed: Sat Oct 6 08:53:37 2012 +0530

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/CollectionsIT.java   |    2 +-
 .../src/main/java/org/apache/crunch/CombineFn.java |   69 ++++++++++----
 2 files changed, 50 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1c95647b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
index 3f372be..0d5803e 100644
--- a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
+++ b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
@@ -38,7 +38,7 @@ import com.google.common.collect.Lists;
 @SuppressWarnings("serial")
 public class CollectionsIT {
 
-  public static class AggregateStringListFn implements CombineFn.Aggregator<Collection<String>> {
+  public static class AggregateStringListFn extends CombineFn.SimpleAggregator<Collection<String>> {
     private final Collection<String> rtn = Lists.newArrayList();
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1c95647b/crunch/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java
index 88fbbaf..246827d 100644
--- a/crunch/src/main/java/org/apache/crunch/CombineFn.java
+++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.SortedSet;
 
 import org.apache.crunch.util.Tuples;
+import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
@@ -42,6 +43,12 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
 
   public static interface Aggregator<T> extends Serializable {
     /**
+     * Perform any setup of this instance that is required prior to processing
+     * inputs.
+     */
+    void initialize(Configuration configuration);
+
+    /**
      * Clears the internal state of this Aggregator and prepares it for the
      * values associated with the next key.
      */
@@ -60,6 +67,16 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
   }
 
   /**
+   * Base class for aggregators that do not require any initialization.
+   */
+  public static abstract class SimpleAggregator<T> implements Aggregator<T> {
+    @Override
+    public void initialize(Configuration conf) {
+      // No-op
+    }
+  }
+  
+  /**
    * Interface for constructing new aggregator instances.
    */
   public static interface AggregatorFactory<T> {
@@ -79,6 +96,11 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
 
     @Override
+    public void initialize() {
+      aggregator.initialize(getConfiguration());
+    }
+    
+    @Override
     public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
       aggregator.reset();
       for (V v : input.second()) {
@@ -101,6 +123,13 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
 
     @Override
+    public void initialize(Configuration configuration) {
+      for (Aggregator<?> a : aggregators) {
+        a.initialize(configuration);
+      }
+    }
+    
+    @Override
     public void reset() {
       for (Aggregator<?> a : aggregators) {
         a.reset();
@@ -379,7 +408,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
       return aggregator(new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength));
   }
 
-  public static class SumLongs implements Aggregator<Long> {
+  public static class SumLongs extends SimpleAggregator<Long> {
     private long sum = 0;
 
     @Override
@@ -404,7 +433,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class SumInts implements Aggregator<Integer> {
+  public static class SumInts extends SimpleAggregator<Integer> {
     private int sum = 0;
 
     @Override
@@ -429,7 +458,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class SumFloats implements Aggregator<Float> {
+  public static class SumFloats extends SimpleAggregator<Float> {
     private float sum = 0;
 
     @Override
@@ -454,7 +483,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class SumDoubles implements Aggregator<Double> {
+  public static class SumDoubles extends SimpleAggregator<Double> {
     private double sum = 0;
 
     @Override
@@ -479,7 +508,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class SumBigInts implements Aggregator<BigInteger> {
+  public static class SumBigInts extends SimpleAggregator<BigInteger> {
     private BigInteger sum = BigInteger.ZERO;
 
     @Override
@@ -504,7 +533,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class MaxLongs implements Aggregator<Long> {
+  public static class MaxLongs extends SimpleAggregator<Long> {
     private Long max = null;
 
     @Override
@@ -531,7 +560,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class MaxInts implements Aggregator<Integer> {
+  public static class MaxInts extends SimpleAggregator<Integer> {
     private Integer max = null;
 
     @Override
@@ -558,7 +587,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class MaxFloats implements Aggregator<Float> {
+  public static class MaxFloats extends SimpleAggregator<Float> {
     private Float max = null;
 
     @Override
@@ -585,7 +614,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class MaxDoubles implements Aggregator<Double> {
+  public static class MaxDoubles extends SimpleAggregator<Double> {
     private Double max = null;
 
     @Override
@@ -612,7 +641,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class MaxBigInts implements Aggregator<BigInteger> {
+  public static class MaxBigInts extends SimpleAggregator<BigInteger> {
     private BigInteger max = null;
 
     @Override
@@ -639,7 +668,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class MinLongs implements Aggregator<Long> {
+  public static class MinLongs extends SimpleAggregator<Long> {
     private Long min = null;
 
     @Override
@@ -666,7 +695,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class MinInts implements Aggregator<Integer> {
+  public static class MinInts extends SimpleAggregator<Integer> {
     private Integer min = null;
 
     @Override
@@ -693,7 +722,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class MinFloats implements Aggregator<Float> {
+  public static class MinFloats extends SimpleAggregator<Float> {
     private Float min = null;
 
     @Override
@@ -720,7 +749,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class MinDoubles implements Aggregator<Double> {
+  public static class MinDoubles extends SimpleAggregator<Double> {
     private Double min = null;
 
     @Override
@@ -747,7 +776,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class MinBigInts implements Aggregator<BigInteger> {
+  public static class MinBigInts extends SimpleAggregator<BigInteger> {
     private BigInteger min = null;
 
     @Override
@@ -774,7 +803,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   };
 
-  public static class MaxNAggregator<V extends Comparable<V>> implements Aggregator<V> {
+  public static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
     private final int arity;
     private transient SortedSet<V> elements;
 
@@ -807,7 +836,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
-  public static class MinNAggregator<V extends Comparable<V>> implements Aggregator<V> {
+  public static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
     private final int arity;
     private transient SortedSet<V> elements;
 
@@ -840,7 +869,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
-  public static class FirstNAggregator<V> implements Aggregator<V> {
+  public static class FirstNAggregator<V> extends SimpleAggregator<V> {
     private final int arity;
     private final List<V> elements;
 
@@ -867,7 +896,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
-  public static class LastNAggregator<V> implements Aggregator<V> {
+  public static class LastNAggregator<V> extends SimpleAggregator<V> {
     private final int arity;
     private final LinkedList<V> elements;
 
@@ -895,7 +924,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
-  public static class StringConcatAggregator implements Aggregator<String> {
+  public static class StringConcatAggregator extends SimpleAggregator<String> {
     private final String separator;
     private final boolean skipNulls;
     private final long maxOutputLength;