You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/06/20 03:52:31 UTC

git commit: CRUNCH-422: Scrunch collect(PartialFunction) support

Repository: crunch
Updated Branches:
  refs/heads/master fd0bce36b -> ac4a525ad


CRUNCH-422: Scrunch collect(PartialFunction) support


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ac4a525a
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ac4a525a
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ac4a525a

Branch: refs/heads/master
Commit: ac4a525ad65e44c564290beb4642f588e3b96e23
Parents: fd0bce3
Author: Josh Wills <jw...@apache.org>
Authored: Mon Jun 16 17:53:19 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Jun 19 18:48:19 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/crunch/scrunch/Mem.scala   |  6 ----
 .../org/apache/crunch/scrunch/PCollection.scala |  4 +++
 .../org/apache/crunch/scrunch/PTable.scala      |  4 +++
 .../crunch/scrunch/PartialFunctionTest.scala    | 35 ++++++++++++++++++++
 4 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ac4a525a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala
index 58d646f..66abb64 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala
@@ -17,16 +17,10 @@
  */
 package org.apache.crunch.scrunch
 
-import java.lang.{Iterable => JIterable}
-
 import scala.collection.JavaConversions._
 
-import org.apache.hadoop.conf.Configuration
-
 import org.apache.crunch.{Pair => P}
-import org.apache.crunch.{Source, TableSource, Target}
 import org.apache.crunch.impl.mem.MemPipeline
-import org.apache.crunch.scrunch.Conversions._
 
 /**
  * Object for working with in-memory PCollection and PTable instances.

http://git-wip-us.apache.org/repos/asf/crunch/blob/ac4a525a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
index 31c2f8a..e2f7b5b 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
@@ -75,6 +75,10 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol
 
   protected def wrap(newNative: JCollection[_]) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
 
+  def collect[T, To](pf: PartialFunction[S, T])(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]) = {
+    filter(pf.isDefinedAt(_)).map(pf)(pt, b)
+  }
+
   def increment(groupName: String, counterName: String, count: Long) = {
     new IncrementPCollection[S](this).apply(groupName, counterName, count)
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/ac4a525a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index aefad67..6fab61a 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -51,6 +51,10 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
     wrap(native.parallelDo("withPType", ident, pt))
   }
 
+  def collect[T, To](pf: PartialFunction[(K, V), T])(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]) = {
+    filter((k, v) => pf.isDefinedAt((k, v))).map((k, v) => pf((k, v)))(pt, b)
+  }
+
   def mapValues[T](f: V => T)(implicit pt: PTypeH[T]) = {
     val ptf = getTypeFamily()
     val ptype = ptf.tableOf(native.getKeyType(), pt.get(ptf))

http://git-wip-us.apache.org/repos/asf/crunch/blob/ac4a525a/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PartialFunctionTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PartialFunctionTest.scala b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PartialFunctionTest.scala
new file mode 100644
index 0000000..2c2b75f
--- /dev/null
+++ b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PartialFunctionTest.scala
@@ -0,0 +1,35 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.crunch.scrunch
+
+import org.junit.Test
+
+class PartialFunctionTest extends CrunchSuite {
+  @Test def testPartialFunction {
+    val pf = Mem.collectionOf(1, 2, 3).collect({ case i: Int if i != 3 => i + 1 })
+    org.junit.Assert.assertEquals(List(2, 3), pf.materialize().toList)
+  }
+
+  @Test def testPartialFunctionPTable {
+    val pf = Mem.tableOf("a" -> 1, "b" -> 2, "c" -> 3).collect({ case (k, v) if k != "c" => v + 1 })
+    org.junit.Assert.assertEquals(List(2, 3), pf.materialize().toList)
+  }
+}