You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/03 21:43:41 UTC

crunch git commit: Add asPCollection method to PTable and corresponding Conversions method (CRUNCH-483)

Repository: crunch
Updated Branches:
  refs/heads/master 36c100e7b -> 3088d8275


Add asPCollection method to PTable and corresponding Conversions method (CRUNCH-483)

When using Scrunch PCollections and attempting to map to a pair of values, the
keyvalue implicit function in CanParallelDo will "upgrade" the result to a
PTable[K, V]. This is often the desired behaviour, but as Scrunch PTable is not
an extension of Scrunch PCollection, then there are cases where this is not what
is wanted. Adding asPCollection allows for an explicit "downgrade" in this cases
where it is desirable to have a PCollection[(K, V)] instead.

Signed-off-by: Josh Wills <jo...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3088d827
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3088d827
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3088d827

Branch: refs/heads/master
Commit: 3088d827506c28ecb2b845a55fd14c4a0bda324b
Parents: 36c100e
Author: David Whiting <da...@spotify.com>
Authored: Thu Dec 18 13:50:26 2014 +0100
Committer: Josh Wills <jo...@gmail.com>
Committed: Sat Jan 3 12:39:52 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/crunch/scrunch/Conversions.scala    | 2 ++
 .../src/main/scala/org/apache/crunch/scrunch/PTable.scala     | 7 +++++++
 2 files changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/3088d827/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
index c7258ee..5807490 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
@@ -237,4 +237,6 @@ object Conversions {
   implicit def pair2tuple[K, V](p: CPair[K, V]) = (p.first(), p.second())
 
   implicit def tuple2pair[K, V](t: (K, V)) = CPair.of(t._1, t._2)
+  
+  implicit def ptable2pcollect[K, V](table: PTable[K, V]): PCollection[(K, V)] = table.asPCollection()
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/3088d827/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index eaba071..e8b9ba0 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -222,6 +222,13 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
     PObject(native.asMap())
   }
 
+  def asPCollection(): PCollection[(K, V)] = {
+    val pType = getTypeFamily().tuple2(native.getKeyType, native.getValueType)
+    new PCollection(native.parallelDo(new MapFn[CPair[K, V], (K, V)] {
+      override def map(input: CPair[K, V]): (K, V) = (input.first(), input.second())
+    }, pType))
+  }
+
   def pType() = native.getPTableType()
 
   def keyType() = native.getPTableType().getKeyType()