You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/09/14 13:36:36 UTC
[2/2] git commit: CRUNCH-57. PCollections should have a length
function that yields the number of elements in the PCollection.
CRUNCH-57. PCollections should have a length function that yields the number of elements in the PCollection.
Signed-off-by: Josh Wills <jw...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/8cea3d02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/8cea3d02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/8cea3d02
Branch: refs/heads/master
Commit: 8cea3d02e06a7038662fd57f4dd7f440d4066276
Parents: d49c07a
Author: Kiyan Ahmadizadeh <ki...@wibidata.com>
Authored: Tue Sep 4 20:38:23 2012 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Sep 14 03:53:07 2012 -0700
----------------------------------------------------------------------
.../org/apache/crunch/scrunch/LengthTest.scala | 39 ++++++++
.../org/apache/crunch/scrunch/PCollection.scala | 2 +-
.../apache/crunch/scrunch/PCollectionLike.scala | 15 +++-
.../scala/org/apache/crunch/scrunch/PObject.scala | 5 +-
.../org/apache/crunch/CollectionsLengthIT.java | 76 +++++++++++++++
.../main/java/org/apache/crunch/PCollection.java | 7 ++
.../crunch/impl/mem/collect/MemCollection.java | 5 +
.../crunch/impl/mr/collect/PCollectionImpl.java | 5 +
.../main/java/org/apache/crunch/lib/Aggregate.java | 20 ++++
9 files changed, 168 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/LengthTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/LengthTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/LengthTest.scala
new file mode 100644
index 0000000..4a53e89
--- /dev/null
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/LengthTest.scala
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch
+
+import org.apache.crunch.io.{From => from, To => to}
+import org.apache.crunch.test.CrunchTestSupport
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+/**
+ * Tests computing the number of elements in a PCollection from the Scala api.
+ */
+class LengthTest extends CrunchTestSupport with JUnitSuite {
+
+ @Test def testLength {
+ val linesInShakespear: Long = 3667
+ val pipeline = Pipeline.mapReduce[LengthTest](tempDir.getDefaultConfiguration)
+ val input = tempDir.copyResourceFileName("shakes.txt")
+
+ val len = pipeline.read(from.textFile(input)).length()
+ assert(linesInShakespear == len.value())
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
index 17a8c07..04f2a56 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
@@ -20,7 +20,7 @@ package org.apache.crunch.scrunch
import scala.collection.JavaConversions
import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
-import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, Target}
+import org.apache.crunch.{PCollection => JCollection, Pair => CPair}
import org.apache.crunch.lib.{Aggregate, Cartesian}
import org.apache.crunch.scrunch.Conversions._
import org.apache.crunch.scrunch.interpreter.InterpreterRunner
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
index d8d74fc..5aee5cf 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
@@ -23,9 +23,9 @@ import org.apache.crunch.types.{PType, PTableType}
trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
val native: NativeType
-
+
def wrap(newNative: AnyRef): FullType
-
+
def write(target: Target): FullType = wrap(native.write(target))
def parallelDo[T](fn: DoFn[S, T], ptype: PType[T]) = {
@@ -43,6 +43,15 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
def parallelDo[K, V](name: String, fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = {
new PTable[K, V](native.parallelDo(name, fn, ptype))
}
-
+
+ /**
+ * Gets the number of elements represented by this PCollection.
+ *
+ * @return The number of elements in this PCollection.
+ */
+ def length(): PObject[java.lang.Long] = {
+ PObject(native.length())
+ }
+
def getTypeFamily() = Avros
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala
index d52cb2c..5dcead4 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PObject.scala
@@ -18,7 +18,6 @@
package org.apache.crunch.scrunch
import org.apache.crunch.{PObject => JPObject}
-import org.apache.crunch.Target
/**
* Represents a singleton value that results from a distributed computation.
@@ -34,7 +33,9 @@ class PObject[T] private (private val native: JPObject[T]) {
*
* @return The value associated with this PObject.
*/
- def value(): T = native.getValue()
+ def value(): T = {
+ native.getValue()
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
new file mode 100644
index 0000000..60385f0
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.Long;
+import java.util.Collection;
+
+import org.apache.crunch.PObject;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+@SuppressWarnings("serial")
+public class CollectionsLengthIT {
+
+ public static final Long LINES_IN_SHAKESPEAR = 3667L;
+
+ @Rule
+ public TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Test
+ public void testWritables() throws IOException {
+ run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testAvro() throws IOException {
+ run(new MRPipeline(CollectionsIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testInMemoryWritables() throws IOException {
+ run(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+ }
+
+ @Test
+ public void testInMemoryAvro() throws IOException {
+ run(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+ }
+
+ public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+ String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
+
+ PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
+ Long length = shakespeare.length().getValue();
+ assertEquals("Incorrect length for shakespear PCollection.", LINES_IN_SHAKESPEAR, length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java
index 8e73159..f5a3465 100644
--- a/crunch/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/PCollection.java
@@ -130,6 +130,13 @@ public interface PCollection<S> {
long getSize();
/**
+ * Returns the number of elements represented by this {@code PCollection}.
+ *
+ * @return An {@code PObject} containing the number of elements in this {@code PCollection}.
+ */
+ PObject<Long> length();
+
+ /**
* Returns a shorthand name for this PCollection.
*/
String getName();
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 9e5b6f2..a79ec2b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -166,6 +166,11 @@ public class MemCollection<S> implements PCollection<S> {
}
@Override
+ public PObject<Long> length() {
+ return Aggregate.length(this);
+ }
+
+ @Override
public PCollection<S> sample(double acceptanceProbability) {
return Sample.sample(this, acceptanceProbability);
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index 486b976..d4948c0 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -163,6 +163,11 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
}
@Override
+ public PObject<Long> length() {
+ return Aggregate.length(this);
+ }
+
+ @Override
public PObject<S> max() {
return Aggregate.max(this);
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/8cea3d02/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
index c2c56c0..a0588e0 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -20,6 +20,7 @@ package org.apache.crunch.lib;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
@@ -59,6 +60,25 @@ public class Aggregate {
}, tf.tableOf(collect.getPType(), tf.longs())).groupByKey().combineValues(CombineFn.<S> SUM_LONGS());
}
+ /**
+ * Returns the number of elements in the provided PCollection.
+ *
+ * @param collect The PCollection whose elements should be counted.
+ * @param <S> The type of the PCollection.
+ * @return A {@code PObject} containing the number of elements in the {@code PCollection}.
+ */
+ public static <S> PObject<Long> length(PCollection<S> collect) {
+ PTypeFamily tf = collect.getTypeFamily();
+ PTable<Integer, Long> countTable = collect.parallelDo("Aggregate.count",
+ new MapFn<S, Pair<Integer, Long>>() {
+ public Pair<Integer, Long> map(S input) {
+ return Pair.of(1, 1L);
+ }
+ }, tf.tableOf(tf.ints(), tf.longs())).groupByKey().combineValues(CombineFn.<Integer> SUM_LONGS());
+ PCollection<Long> count = countTable.values();
+ return new FirstElementPObject<Long>(count);
+ }
+
public static class PairValueComparator<K, V> implements Comparator<Pair<K, V>> {
private final boolean ascending;