You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/05/20 08:07:42 UTC

git commit: CRUNCH-398: Add Increment one-liner functions to PCollection and PTable.

Repository: crunch
Updated Branches:
  refs/heads/master bbeb7537a -> b90427f3e


CRUNCH-398: Add Increment one-liner functions to PCollection and PTable.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b90427f3
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b90427f3
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b90427f3

Branch: refs/heads/master
Commit: b90427f3efbbb6c5b7bdde57aa2ca6d464ff0f18
Parents: bbeb753
Author: Josh Wills <jw...@apache.org>
Authored: Sat May 17 09:51:35 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon May 19 22:58:08 2014 -0700

----------------------------------------------------------------------
 .../apache/crunch/scrunch/IncrementTest.scala   |  45 ++++++
 .../org/apache/crunch/scrunch/Increment.scala   | 140 +++++++++++++++++++
 .../org/apache/crunch/scrunch/PCollection.scala |  10 +-
 .../org/apache/crunch/scrunch/PTable.scala      |  24 +++-
 4 files changed, 217 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/b90427f3/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/IncrementTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/IncrementTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/IncrementTest.scala
new file mode 100644
index 0000000..338051c
--- /dev/null
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/IncrementTest.scala
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.scrunch
+
+import org.apache.crunch.io.{From => from, To => to}
+
+import _root_.org.junit.Test
+import _root_.org.junit.Assert.assertEquals
+
+class IncrementTest extends CrunchSuite {
+
+  @Test def testIncrement {
+    val pipeline = Pipeline.mapReduce[IncrementTest](tempDir.getDefaultConfiguration)
+    val input = tempDir.copyResourceFileName("shakes.txt")
+
+    pipeline.read(from.textFile(input, Avros.strings))
+        .flatMap(_.toLowerCase.split("\\s+"))
+        .increment("TOP", "ALLWORDS")
+        .filter(!_.isEmpty())
+        .increment("TOP", "NONEMPTY")
+        .incrementIf(_ startsWith "a")("TOP", "AWORDS_2x", 2)
+        .write(to.avroFile(tempDir.getFileName("somewords")))
+
+    val res = pipeline.done()
+    val sr0 = res.getStageResults.get(0)
+    assertEquals(21836, sr0.getCounterValue("TOP", "ALLWORDS"))
+    assertEquals(20366, sr0.getCounterValue("TOP", "NONEMPTY"))
+    assertEquals(3604, sr0.getCounterValue("TOP", "AWORDS_2x"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/b90427f3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Increment.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Increment.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Increment.scala
new file mode 100644
index 0000000..f6b6ffa
--- /dev/null
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Increment.scala
@@ -0,0 +1,140 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.crunch.scrunch
+
+import org.apache.crunch.{FilterFn, Pair => CPair}
+
+/**
+ * The {@code Incrementable[T]} trait defines an object that allows a counter to
+ * be incremented and then returns a reference to another object of the same type.
+ * Both the {@link PCollection} and {@link PTable} types in Scrunch support the
+ * {@code Incrementable} trait.
+ */
+trait Incrementable[T] {
+  def increment(counter: Enum[_]): T = {
+    increment(counter, 1)
+  }
+
+  def increment(counter: Enum[_], count: Long): T = {
+    increment(counter.getClass.getCanonicalName, counter.toString, count)
+  }
+
+  def increment(groupEnum: Enumeration, value: Enumeration#Value): T = {
+    increment(groupEnum, value, 1)
+  }
+
+  def increment(groupEnum: Enumeration, value: Enumeration#Value, count: Long): T = {
+    increment(groupEnum.toString, value.toString, count)
+  }
+
+  def increment(groupName: String, counterName: String): T = {
+    increment(groupName, counterName, 1)
+  }
+
+  def increment(groupName: String, counterName: String, count: Long): T
+}
+
+/**
+ * Incrementable classes may also support conditionally incrementing a counter,
+ * such as via the {@link PCollection#incrementIf} method or the {@link PTable#incrementIf}
+ * and {@link PTable#incrementIfValue} methods. In these cases, the return type
+ * is an instance of {@code Increment} that returns a reference to a new PCollection/PTable
+ * after it is applied to a specified counter group and value.
+ */
+trait Increment[T] {
+
+  def apply(counter: Enum[_]): T = {
+    apply(counter, 1)
+  }
+
+  def apply(counter: Enum[_], count: Long): T = {
+    apply(counter.getClass.getCanonicalName, counter.toString, count)
+  }
+
+  def apply(groupEnum: Enumeration, value: Enumeration#Value): T = {
+    apply(groupEnum, value, 1)
+  }
+
+  def apply(groupEnum: Enumeration, value: Enumeration#Value, count: Long): T = {
+    apply(groupEnum.toString, value.toString, count)
+  }
+
+  def apply(groupName: String, counterName: String): T = {
+    apply(groupName, counterName, 1)
+  }
+
+  def apply(groupName: String, counterName: String, count: Long): T
+}
+
+class IncrementPCollection[S](val pc: PCollection[S]) extends Increment[PCollection[S]] {
+  override def apply(groupName: String, counterName: String, count: Long) = {
+    pc.parallelDo("inc=" + groupName + ":" + counterName,
+      new CounterFn[S](groupName, counterName, count),
+      pc.pType())
+  }
+}
+
+class IncrementIfPCollection[S](val pc: PCollection[S], val f: S => Boolean) extends Increment[PCollection[S]] {
+  override def apply(groupName: String, counterName: String, count: Long) = {
+    pc.parallelDo("incif=" + groupName + ":" + counterName,
+      new IfCounterFn[S](groupName, counterName, count, f),
+      pc.pType())
+  }
+}
+
+class IncrementPTable[K, V](val pc: PTable[K, V]) extends Increment[PTable[K, V]] {
+  override def apply(groupName: String, counterName: String, count: Long) = {
+    pc.parallelDo("inc=" + groupName + ":" + counterName,
+      new CounterFn[CPair[K, V]](groupName, counterName, count),
+      pc.pType())
+  }
+}
+
+class IncrementIfPTable[K, V](val pc: PTable[K, V], val f: CPair[K, V] => Boolean) extends Increment[PTable[K, V]] {
+  override def apply(groupName: String, counterName: String, count: Long) = {
+    pc.parallelDo("inc=" + groupName + ":" + counterName,
+      new IfCounterFn[CPair[K, V]](groupName, counterName, count, f),
+      pc.pType())
+  }
+}
+
+class CounterFn[S](val group: String, val counter: String, val count: Long)
+  extends FilterFn[S] {
+  override def scaleFactor() = 1.0f
+
+  def accept(s: S) = {
+    increment(group, counter, count)
+    true
+  }
+}
+
+class IfCounterFn[S](val group: String, val counter: String, val count: Long, val cond: S => Boolean)
+  extends FilterFn[S] {
+  override def scaleFactor() = 1.0f
+
+  def accept(s: S) = {
+    if (cond(s)) {
+      increment(group, counter, count)
+    }
+    true
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/crunch/blob/b90427f3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
index 2d4ed44..dc0ab0b 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
@@ -27,7 +27,9 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext
 import org.apache.crunch.types.PType
 import org.apache.crunch.fn.IdentityFn
 
-class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]] {
+class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]]
+  with Incrementable[PCollection[S]] {
+
   import PCollection._
 
   type FunctionType[T] = S => T
@@ -73,6 +75,12 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol
 
   protected def wrap(newNative: JCollection[_]) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
 
+  def increment(groupName: String, counterName: String, count: Long) = {
+    new IncrementPCollection[S](this).apply(groupName, counterName, count)
+  }
+
+  def incrementIf(f: S => Boolean) = new IncrementIfPCollection[S](this, f)
+
   def count() = {
     val count = new PTable[S, java.lang.Long](Aggregate.count(native))
     count.mapValues(_.longValue())

http://git-wip-us.apache.org/repos/asf/crunch/blob/b90427f3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index 2f88b0c..1d5a70e 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -30,7 +30,8 @@ import scala.collection.Iterable
 import org.apache.hadoop.mapreduce.TaskInputOutputContext
 import org.apache.crunch.fn.IdentityFn
 
-class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] {
+class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]]
+  with Incrementable[PTable[K, V]] {
   import PTable._
 
   type FunctionType[T] = (K, V) => T
@@ -143,6 +144,16 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
     new PTable[K, V](newNative.asInstanceOf[JTable[K, V]])
   }
 
+  def increment(groupName: String, counterName: String, count: Long) = {
+    new IncrementPTable[K, V](this).apply(groupName, counterName, count)
+  }
+
+  def incrementIf(f: (K, V) => Boolean) = new IncrementIfPTable[K, V](this, incFn(f))
+
+  def incrementIfKey(f: K => Boolean) = new IncrementIfPTable[K, V](this, incKeyFn(f))
+
+  def incrementIfValue(f: V => Boolean) = new IncrementIfPTable[K, V](this, incValueFn(f))
+
   def materialize(): Iterable[(K, V)] = {
     InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration())
     native.materialize.view.map(x => (x.first, x.second))
@@ -265,4 +276,15 @@ object PTable {
     new SDoPairTableFn[K, V, S, T] { def apply(k: K, v: V) = fn(k, v) }
   }
 
+  def incFn[K, V, T](fn: (K, V) => T) = new Function1[CPair[K, V], T] with Serializable {
+    def apply(p: CPair[K, V]): T = fn(p.first(), p.second())
+  }
+
+  def incKeyFn[K, V, T](fn: K => T) = new Function1[CPair[K, V], T] with Serializable {
+    def apply(p: CPair[K, V]): T = fn(p.first())
+  }
+
+  def incValueFn[K, V, T](fn: V => T) = new Function1[CPair[K, V], T] with Serializable {
+    def apply(p: CPair[K, V]): T = fn(p.second())
+  }
 }