You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/04/16 00:02:16 UTC

git commit: CRUNCH-376: Add aggregate(...) method to PCollection. Contributed by Jason Gauci, and then split aggregate(...) into aggregate(...) and first().

Repository: crunch
Updated Branches:
  refs/heads/master b559e0e58 -> 9355e74ec


CRUNCH-376: Add aggregate(...) method to PCollection. Contributed by Jason Gauci,
and then split aggregate(...) into aggregate(...) and first().


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9355e74e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9355e74e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9355e74e

Branch: refs/heads/master
Commit: 9355e74ec221bd6e2835ac591e7e30f1654799b5
Parents: b559e0e
Author: Josh Wills <jw...@apache.org>
Authored: Mon Apr 14 17:32:01 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Apr 15 15:00:09 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/PCollection.java     | 11 +++
 .../impl/dist/collect/PCollectionImpl.java      | 11 +++
 .../crunch/impl/mem/collect/MemCollection.java  | 10 +++
 .../java/org/apache/crunch/lib/Aggregate.java   | 13 ++++
 .../apache/crunch/examples/TotalWordCount.java  | 78 ++++++++++++++++++++
 5 files changed, 123 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/9355e74e/crunch-core/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
index 2d62d00..bf5bacc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
@@ -183,6 +183,11 @@ public interface PCollection<S> {
   PObject<Collection<S>> asCollection();
 
   /**
+   * @return The first element of this {@code PCollection}.
+   */
+  PObject<S> first();
+
+  /**
    * @return A reference to the data in this instance that can be read from a job running
    * on a cluster.
    *
@@ -267,4 +272,10 @@ public interface PCollection<S> {
    * Returns a {@code PObject} of the minimum element of this instance.
    */
   PObject<S> min();
+  
+  /**
+   * Returns a {@code PCollection} that contains the result of aggregating all values in this instance.
+   */
+  PCollection<S> aggregate(Aggregator<S> aggregator);
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/9355e74e/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
index cb9c60c..9167863 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
@@ -19,6 +19,8 @@ package org.apache.crunch.impl.dist.collect;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
+import org.apache.crunch.Aggregator;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.FilterFn;
@@ -37,6 +39,7 @@ import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.materialize.pobject.CollectionPObject;
+import org.apache.crunch.materialize.pobject.FirstElementPObject;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
@@ -211,6 +214,9 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
     return new CollectionPObject<S>(this);
   }
 
+  @Override
+  public PObject<S> first() { return new FirstElementPObject<S>(this); }
+
   public SourceTarget<S> getMaterializedAt() {
     return materializedAt;
   }
@@ -259,6 +265,11 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
   public PObject<S> min() {
     return Aggregate.min(this);
   }
+  
+  @Override
+  public PCollection<S> aggregate(Aggregator<S> aggregator) {
+    return Aggregate.aggregate(this, aggregator);
+  }
 
   @Override
   public PTypeFamily getTypeFamily() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/9355e74e/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 81433eb..8e509bc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -25,6 +25,7 @@ import javassist.util.proxy.MethodFilter;
 import javassist.util.proxy.MethodHandler;
 import javassist.util.proxy.ProxyFactory;
 
+import org.apache.crunch.Aggregator;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.FilterFn;
@@ -42,6 +43,7 @@ import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.materialize.pobject.CollectionPObject;
+import org.apache.crunch.materialize.pobject.FirstElementPObject;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
@@ -184,6 +186,9 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
+  public PObject<S> first() { return new FirstElementPObject<S>(this); }
+
+  @Override
   public ReadableData<S> asReadable(boolean materialize) {
     return new MemReadableData<S>(collect);
   }
@@ -241,6 +246,11 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
+  public PCollection<S> aggregate(Aggregator<S> aggregator) {
+    return Aggregate.aggregate(this, aggregator);
+  }
+  
+  @Override
   public PCollection<S> filter(FilterFn<S> filterFn) {
     return parallelDo(filterFn, getPType());
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/9355e74e/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
index d8388b3..3d132d4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -23,6 +23,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
 
+import org.apache.crunch.Aggregator;
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
@@ -277,4 +278,16 @@ public class Aggregate {
           }
         }, tf.collections(collect.getValueType()));
   }
+  
+  public static <S> PCollection<S> aggregate(PCollection<S> collect, Aggregator<S> aggregator) {
+    PTypeFamily tf = collect.getTypeFamily();
+    return collect.parallelDo("Aggregate.aggregator", new MapFn<S, Pair<Boolean, S>>() {
+      public Pair<Boolean, S> map(S input) {
+        return Pair.of(false, input);
+      }
+    }, tf.tableOf(tf.booleans(), collect.getPType()))
+    .groupByKey(1)
+    .combineValues(aggregator)
+    .values();
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/9355e74e/crunch-examples/src/main/java/org/apache/crunch/examples/TotalWordCount.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/TotalWordCount.java b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalWordCount.java
new file mode 100644
index 0000000..374cec1
--- /dev/null
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalWordCount.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.examples;
+
+import java.io.Serializable;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PObject;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class TotalWordCount extends Configured implements Tool, Serializable {
+  public int run(String[] args) throws Exception {
+    if (args.length != 1) {
+      System.err.println();
+      System.err.println("Usage: " + this.getClass().getName() + " [generic options] input");
+      System.err.println();
+      GenericOptionsParser.printGenericCommandUsage(System.err);
+      return 1;
+    }
+    // Create an object to coordinate pipeline creation and execution.
+    Pipeline pipeline = new MRPipeline(TotalWordCount.class, getConf());
+    // Reference a given text file as a collection of Strings.
+    PCollection<String> lines = pipeline.readTextFile(args[0]);
+
+    // Define a function that splits each line in a PCollection of Strings into
+    // a
+    // PCollection made up of the individual words in the file.
+    PCollection<Long> numberOfWords = lines.parallelDo(new DoFn<String, Long>() {
+      public void process(String line, Emitter<Long> emitter) {
+        emitter.emit((long)line.split("\\s+").length);
+      }
+    }, Writables.longs()); // Indicates the serialization format
+
+    // The aggregate method groups a collection into a single PObject.
+    PObject<Long> totalCount = numberOfWords.aggregate(Aggregators.SUM_LONGS()).first();
+
+    // Execute the pipeline as a MapReduce.
+    PipelineResult result = pipeline.run();
+
+    System.out.println("Total number of words: " + totalCount.getValue());
+    
+    pipeline.done();
+
+    return result.succeeded() ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int result = ToolRunner.run(new Configuration(), new TotalWordCount(), args);
+    System.exit(result);
+  }
+}