You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/06/20 03:52:31 UTC
git commit: CRUNCH-422: Scrunch collect(PartialFunction) support
Repository: crunch
Updated Branches:
refs/heads/master fd0bce36b -> ac4a525ad
CRUNCH-422: Scrunch collect(PartialFunction) support
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ac4a525a
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ac4a525a
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ac4a525a
Branch: refs/heads/master
Commit: ac4a525ad65e44c564290beb4642f588e3b96e23
Parents: fd0bce3
Author: Josh Wills <jw...@apache.org>
Authored: Mon Jun 16 17:53:19 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Jun 19 18:48:19 2014 -0700
----------------------------------------------------------------------
.../scala/org/apache/crunch/scrunch/Mem.scala | 6 ----
.../org/apache/crunch/scrunch/PCollection.scala | 4 +++
.../org/apache/crunch/scrunch/PTable.scala | 4 +++
.../crunch/scrunch/PartialFunctionTest.scala | 35 ++++++++++++++++++++
4 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/ac4a525a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala
index 58d646f..66abb64 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala
@@ -17,16 +17,10 @@
*/
package org.apache.crunch.scrunch
-import java.lang.{Iterable => JIterable}
-
import scala.collection.JavaConversions._
-import org.apache.hadoop.conf.Configuration
-
import org.apache.crunch.{Pair => P}
-import org.apache.crunch.{Source, TableSource, Target}
import org.apache.crunch.impl.mem.MemPipeline
-import org.apache.crunch.scrunch.Conversions._
/**
* Object for working with in-memory PCollection and PTable instances.
http://git-wip-us.apache.org/repos/asf/crunch/blob/ac4a525a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
index 31c2f8a..e2f7b5b 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
@@ -75,6 +75,10 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol
protected def wrap(newNative: JCollection[_]) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
+ def collect[T, To](pf: PartialFunction[S, T])(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]) = {
+ filter(pf.isDefinedAt(_)).map(pf)(pt, b)
+ }
+
def increment(groupName: String, counterName: String, count: Long) = {
new IncrementPCollection[S](this).apply(groupName, counterName, count)
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/ac4a525a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index aefad67..6fab61a 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -51,6 +51,10 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
wrap(native.parallelDo("withPType", ident, pt))
}
+ def collect[T, To](pf: PartialFunction[(K, V), T])(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]) = {
+ filter((k, v) => pf.isDefinedAt((k, v))).map((k, v) => pf((k, v)))(pt, b)
+ }
+
def mapValues[T](f: V => T)(implicit pt: PTypeH[T]) = {
val ptf = getTypeFamily()
val ptype = ptf.tableOf(native.getKeyType(), pt.get(ptf))
http://git-wip-us.apache.org/repos/asf/crunch/blob/ac4a525a/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PartialFunctionTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PartialFunctionTest.scala b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PartialFunctionTest.scala
new file mode 100644
index 0000000..2c2b75f
--- /dev/null
+++ b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/PartialFunctionTest.scala
@@ -0,0 +1,35 @@
+/*
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.crunch.scrunch
+
+import org.junit.Test
+
+class PartialFunctionTest extends CrunchSuite {
+ @Test def testPartialFunction {
+ val pf = Mem.collectionOf(1, 2, 3).collect({ case i: Int if i != 3 => i + 1 })
+ org.junit.Assert.assertEquals(List(2, 3), pf.materialize().toList)
+ }
+
+ @Test def testPartialFunctionPTable {
+ val pf = Mem.tableOf("a" -> 1, "b" -> 2, "c" -> 3).collect({ case (k, v) if k != "c" => v + 1 })
+ org.junit.Assert.assertEquals(List(2, 3), pf.materialize().toList)
+ }
+}