You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/05/20 08:07:42 UTC
git commit: CRUNCH-398: Add Increment one-liner functions to
PCollection and PTable.
Repository: crunch
Updated Branches:
refs/heads/master bbeb7537a -> b90427f3e
CRUNCH-398: Add Increment one-liner functions to PCollection and PTable.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b90427f3
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b90427f3
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b90427f3
Branch: refs/heads/master
Commit: b90427f3efbbb6c5b7bdde57aa2ca6d464ff0f18
Parents: bbeb753
Author: Josh Wills <jw...@apache.org>
Authored: Sat May 17 09:51:35 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon May 19 22:58:08 2014 -0700
----------------------------------------------------------------------
.../apache/crunch/scrunch/IncrementTest.scala | 45 ++++++
.../org/apache/crunch/scrunch/Increment.scala | 140 +++++++++++++++++++
.../org/apache/crunch/scrunch/PCollection.scala | 10 +-
.../org/apache/crunch/scrunch/PTable.scala | 24 +++-
4 files changed, 217 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/b90427f3/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/IncrementTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/IncrementTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/IncrementTest.scala
new file mode 100644
index 0000000..338051c
--- /dev/null
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/IncrementTest.scala
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch
+
+import org.apache.crunch.io.{From => from, To => to}
+
+import _root_.org.junit.Test
+import _root_.org.junit.Assert.assertEquals
+
+class IncrementTest extends CrunchSuite {
+
+ @Test def testIncrement {
+ val pipeline = Pipeline.mapReduce[IncrementTest](tempDir.getDefaultConfiguration)
+ val input = tempDir.copyResourceFileName("shakes.txt")
+
+ pipeline.read(from.textFile(input, Avros.strings))
+ .flatMap(_.toLowerCase.split("\\s+"))
+ .increment("TOP", "ALLWORDS")
+ .filter(!_.isEmpty())
+ .increment("TOP", "NONEMPTY")
+ .incrementIf(_ startsWith "a")("TOP", "AWORDS_2x", 2)
+ .write(to.avroFile(tempDir.getFileName("somewords")))
+
+ val res = pipeline.done()
+ val sr0 = res.getStageResults.get(0)
+ assertEquals(21836, sr0.getCounterValue("TOP", "ALLWORDS"))
+ assertEquals(20366, sr0.getCounterValue("TOP", "NONEMPTY"))
+ assertEquals(3604, sr0.getCounterValue("TOP", "AWORDS_2x"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b90427f3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Increment.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Increment.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Increment.scala
new file mode 100644
index 0000000..f6b6ffa
--- /dev/null
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Increment.scala
@@ -0,0 +1,140 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.crunch.scrunch
+
+import org.apache.crunch.{FilterFn, Pair => CPair}
+
+/**
+ * The {@code Incrementable[T]} trait defines an object that allows a counter to
+ * be incremented and then returns a reference to another object of the same type.
+ * Both the {@link PCollection} and {@link PTable} types in Scrunch support the
+ * {@code Incrementable} trait.
+ */
+trait Incrementable[T] {
+ def increment(counter: Enum[_]): T = {
+ increment(counter, 1)
+ }
+
+ def increment(counter: Enum[_], count: Long): T = {
+ increment(counter.getClass.getCanonicalName, counter.toString, count)
+ }
+
+ def increment(groupEnum: Enumeration, value: Enumeration#Value): T = {
+ increment(groupEnum, value, 1)
+ }
+
+ def increment(groupEnum: Enumeration, value: Enumeration#Value, count: Long): T = {
+ increment(groupEnum.toString, value.toString, count)
+ }
+
+ def increment(groupName: String, counterName: String): T = {
+ increment(groupName, counterName, 1)
+ }
+
+ def increment(groupName: String, counterName: String, count: Long): T
+}
+
+/**
+ * Incrementable classes may also support conditionally incrementing a counter,
+ * such as via the {@link PCollection#incrementIf} method or the {@link PTable#incrementIf}
+ * and {@link PTable#incrementIfValue} methods. In these cases, the return type
+ * is an instance of {@code Increment} that returns a reference to a new PCollection/PTable
+ * after it is applied to a specified counter group and value.
+ */
+trait Increment[T] {
+
+ def apply(counter: Enum[_]): T = {
+ apply(counter, 1)
+ }
+
+ def apply(counter: Enum[_], count: Long): T = {
+ apply(counter.getClass.getCanonicalName, counter.toString, count)
+ }
+
+ def apply(groupEnum: Enumeration, value: Enumeration#Value): T = {
+ apply(groupEnum, value, 1)
+ }
+
+ def apply(groupEnum: Enumeration, value: Enumeration#Value, count: Long): T = {
+ apply(groupEnum.toString, value.toString, count)
+ }
+
+ def apply(groupName: String, counterName: String): T = {
+ apply(groupName, counterName, 1)
+ }
+
+ def apply(groupName: String, counterName: String, count: Long): T
+}
+
+class IncrementPCollection[S](val pc: PCollection[S]) extends Increment[PCollection[S]] {
+ override def apply(groupName: String, counterName: String, count: Long) = {
+ pc.parallelDo("inc=" + groupName + ":" + counterName,
+ new CounterFn[S](groupName, counterName, count),
+ pc.pType())
+ }
+}
+
+class IncrementIfPCollection[S](val pc: PCollection[S], val f: S => Boolean) extends Increment[PCollection[S]] {
+ override def apply(groupName: String, counterName: String, count: Long) = {
+ pc.parallelDo("incif=" + groupName + ":" + counterName,
+ new IfCounterFn[S](groupName, counterName, count, f),
+ pc.pType())
+ }
+}
+
+class IncrementPTable[K, V](val pc: PTable[K, V]) extends Increment[PTable[K, V]] {
+ override def apply(groupName: String, counterName: String, count: Long) = {
+ pc.parallelDo("inc=" + groupName + ":" + counterName,
+ new CounterFn[CPair[K, V]](groupName, counterName, count),
+ pc.pType())
+ }
+}
+
+class IncrementIfPTable[K, V](val pc: PTable[K, V], val f: CPair[K, V] => Boolean) extends Increment[PTable[K, V]] {
+ override def apply(groupName: String, counterName: String, count: Long) = {
+ pc.parallelDo("inc=" + groupName + ":" + counterName,
+ new IfCounterFn[CPair[K, V]](groupName, counterName, count, f),
+ pc.pType())
+ }
+}
+
+class CounterFn[S](val group: String, val counter: String, val count: Long)
+ extends FilterFn[S] {
+ override def scaleFactor() = 1.0f
+
+ def accept(s: S) = {
+ increment(group, counter, count)
+ true
+ }
+}
+
+class IfCounterFn[S](val group: String, val counter: String, val count: Long, val cond: S => Boolean)
+ extends FilterFn[S] {
+ override def scaleFactor() = 1.0f
+
+ def accept(s: S) = {
+ if (cond(s)) {
+ increment(group, counter, count)
+ }
+ true
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/crunch/blob/b90427f3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
index 2d4ed44..dc0ab0b 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
@@ -27,7 +27,9 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext
import org.apache.crunch.types.PType
import org.apache.crunch.fn.IdentityFn
-class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]] {
+class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]]
+ with Incrementable[PCollection[S]] {
+
import PCollection._
type FunctionType[T] = S => T
@@ -73,6 +75,12 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol
protected def wrap(newNative: JCollection[_]) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
+ def increment(groupName: String, counterName: String, count: Long) = {
+ new IncrementPCollection[S](this).apply(groupName, counterName, count)
+ }
+
+ def incrementIf(f: S => Boolean) = new IncrementIfPCollection[S](this, f)
+
def count() = {
val count = new PTable[S, java.lang.Long](Aggregate.count(native))
count.mapValues(_.longValue())
http://git-wip-us.apache.org/repos/asf/crunch/blob/b90427f3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index 2f88b0c..1d5a70e 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -30,7 +30,8 @@ import scala.collection.Iterable
import org.apache.hadoop.mapreduce.TaskInputOutputContext
import org.apache.crunch.fn.IdentityFn
-class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] {
+class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]]
+ with Incrementable[PTable[K, V]] {
import PTable._
type FunctionType[T] = (K, V) => T
@@ -143,6 +144,16 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
new PTable[K, V](newNative.asInstanceOf[JTable[K, V]])
}
+ def increment(groupName: String, counterName: String, count: Long) = {
+ new IncrementPTable[K, V](this).apply(groupName, counterName, count)
+ }
+
+ def incrementIf(f: (K, V) => Boolean) = new IncrementIfPTable[K, V](this, incFn(f))
+
+ def incrementIfKey(f: K => Boolean) = new IncrementIfPTable[K, V](this, incKeyFn(f))
+
+ def incrementIfValue(f: V => Boolean) = new IncrementIfPTable[K, V](this, incValueFn(f))
+
def materialize(): Iterable[(K, V)] = {
InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
native.materialize.view.map(x => (x.first, x.second))
@@ -265,4 +276,15 @@ object PTable {
new SDoPairTableFn[K, V, S, T] { def apply(k: K, v: V) = fn(k, v) }
}
+ def incFn[K, V, T](fn: (K, V) => T) = new Function1[CPair[K, V], T] with Serializable {
+ def apply(p: CPair[K, V]): T = fn(p.first(), p.second())
+ }
+
+ def incKeyFn[K, V, T](fn: K => T) = new Function1[CPair[K, V], T] with Serializable {
+ def apply(p: CPair[K, V]): T = fn(p.first())
+ }
+
+ def incValueFn[K, V, T](fn: V => T) = new Function1[CPair[K, V], T] with Serializable {
+ def apply(p: CPair[K, V]): T = fn(p.second())
+ }
}