You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/09/14 13:36:36 UTC

[2/2] git commit: CRUNCH-57. PCollections should have a length function that yields the number of elements in the PCollection.

CRUNCH-57. PCollections should have a length function that yields the number of elements in the PCollection.

Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/8cea3d02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/8cea3d02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/8cea3d02

Branch: refs/heads/master
Commit: 8cea3d02e06a7038662fd57f4dd7f440d4066276
Parents: d49c07a
Author: Kiyan Ahmadizadeh <ki...@wibidata.com>
Authored: Tue Sep 4 20:38:23 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Sep 14 03:53:07 2012 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/scrunch/LengthTest.scala     |   39 ++++++++
 .../org/apache/crunch/scrunch/PCollection.scala    |    2 +-
 .../apache/crunch/scrunch/PCollectionLike.scala    |   15 +++-
 .../scala/org/apache/crunch/scrunch/PObject.scala  |    5 +-
 .../org/apache/crunch/CollectionsLengthIT.java     |   76 +++++++++++++++
 .../main/java/org/apache/crunch/PCollection.java   |    7 ++
 .../crunch/impl/mem/collect/MemCollection.java     |    5 +
 .../crunch/impl/mr/collect/PCollectionImpl.java    |    5 +
 .../main/java/org/apache/crunch/lib/Aggregate.java |   20 ++++
 9 files changed, 168 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/LengthTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/LengthTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/LengthTest.scala
new file mode 100644
index 0000000..4a53e89
--- /dev/null
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/LengthTest.scala
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch
+
+import org.apache.crunch.io.{From => from, To => to}
+import org.apache.crunch.test.CrunchTestSupport
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+/**
+ * Tests computing the number of elements in a PCollection from the Scala api.
+ */
+class LengthTest extends CrunchTestSupport with JUnitSuite {
+
+  @Test def testLength {
+    val linesInShakespear: Long = 3667
+    val pipeline = Pipeline.mapReduce[LengthTest](tempDir.getDefaultConfiguration)
+    val input = tempDir.copyResourceFileName("shakes.txt")
+
+    val len = pipeline.read(from.textFile(input)).length()
+    assert(linesInShakespear == len.value())
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
index 17a8c07..04f2a56 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
@@ -20,7 +20,7 @@ package org.apache.crunch.scrunch
 import scala.collection.JavaConversions
 
 import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
-import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, Target}
+import org.apache.crunch.{PCollection => JCollection, Pair => CPair}
 import org.apache.crunch.lib.{Aggregate, Cartesian}
 import org.apache.crunch.scrunch.Conversions._
 import org.apache.crunch.scrunch.interpreter.InterpreterRunner

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
index d8d74fc..5aee5cf 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
@@ -23,9 +23,9 @@ import org.apache.crunch.types.{PType, PTableType}
 
 trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
   val native: NativeType
-  
+
   def wrap(newNative: AnyRef): FullType
-  
+
   def write(target: Target): FullType = wrap(native.write(target))
 
   def parallelDo[T](fn: DoFn[S, T], ptype: PType[T]) = {
@@ -43,6 +43,15 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
   def parallelDo[K, V](name: String, fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = {
     new PTable[K, V](native.parallelDo(name, fn, ptype))
   }
-  
+
+  /**
+   * Gets the number of elements represented by this PCollection.
+   *
+   * @return The number of elements in this PCollection.
+   */
+  def length(): PObject[java.lang.Long] = {
+    PObject(native.length())
+  }
+
   def getTypeFamily() = Avros
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala
index d52cb2c..5dcead4 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala
@@ -18,7 +18,6 @@
 package org.apache.crunch.scrunch
 
 import org.apache.crunch.{PObject => JPObject}
-import org.apache.crunch.Target
 
 /**
  * Represents a singleton value that results from a distributed computation.
@@ -34,7 +33,9 @@ class PObject[T] private (private val native: JPObject[T]) {
    *
    * @return The value associated with this PObject.
    */
-  def value(): T = native.getValue()
+  def value(): T = {
+    native.getValue()
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
new file mode 100644
index 0000000..60385f0
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.Long;
+import java.util.Collection;
+
+import org.apache.crunch.PObject;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+@SuppressWarnings("serial")
+public class CollectionsLengthIT {
+
+  public static final Long LINES_IN_SHAKESPEAR = 3667L;
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testWritables() throws IOException {
+    run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testAvro() throws IOException {
+    run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testInMemoryWritables() throws IOException {
+    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testInMemoryAvro() throws IOException {
+    run(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+  }
+
+  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
+
+    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
+    Long length = shakespeare.length().getValue();
+    assertEquals("Incorrect length for shakespear PCollection.", LINES_IN_SHAKESPEAR, length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java
index 8e73159..f5a3465 100644
--- a/crunch/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/PCollection.java
@@ -130,6 +130,13 @@ public interface PCollection<S> {
   long getSize();
 
   /**
+   * Returns the number of elements represented by this {@code PCollection}.
+   *
+   * @return An {@code PObject} containing the number of elements in this {@code PCollection}.
+   */
+  PObject<Long> length();
+
+  /**
    * Returns a shorthand name for this PCollection.
    */
   String getName();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 9e5b6f2..a79ec2b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -166,6 +166,11 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
+  public PObject<Long> length() {
+    return Aggregate.length(this);
+  }
+
+  @Override
   public PCollection<S> sample(double acceptanceProbability) {
     return Sample.sample(this, acceptanceProbability);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index 486b976..d4948c0 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -163,6 +163,11 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
   }
 
   @Override
+  public PObject<Long> length() {
+    return Aggregate.length(this);
+  }
+
+  @Override
   public PObject<S> max() {
     return Aggregate.max(this);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
index c2c56c0..a0588e0 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -20,6 +20,7 @@ package org.apache.crunch.lib;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
 
@@ -59,6 +60,25 @@ public class Aggregate {
     }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey().combineValues(CombineFn.<S> SUM_LONGS());
   }
 
+  /**
+   * Returns the number of elements in the provided PCollection.
+   *
+   * @param collect The PCollection whose elements should be counted.
+   * @param <S> The type of the PCollection.
+   * @return A {@code PObject} containing the number of elements in the {@code PCollection}.
+   */
+  public static <S> PObject<Long> length(PCollection<S> collect) {
+    PTypeFamily tf = collect.getTypeFamily();
+    PTable<Integer, Long> countTable = collect.parallelDo("Aggregate.count",
+        new MapFn<S, Pair<Integer, Long>>() {
+          public Pair<Integer, Long> map(S input) {
+            return Pair.of(1, 1L);
+          }
+        }, tf.tableOf(tf.ints(), tf.longs())).groupByKey().combineValues(CombineFn.<Integer> SUM_LONGS());
+    PCollection<Long> count = countTable.values();
+    return new FirstElementPObject<Long>(count);
+  }
+
   public static class PairValueComparator<K, V> implements Comparator<Pair<K, V>> {
     private final boolean ascending;