You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/04/18 19:02:37 UTC

git commit: Reuses Row object in ExistingRdd.productToRowRdd()

Repository: spark
Updated Branches:
  refs/heads/master e31c8ffca -> 89f47434e


Reuses Row object in ExistingRdd.productToRowRdd()

Author: Cheng Lian <li...@gmail.com>

Closes #432 from liancheng/reuseRow and squashes the following commits:

9e6d083 [Cheng Lian] Simplified code with BufferedIterator
52acec9 [Cheng Lian] Reuses Row object in ExistingRdd.productToRowRdd()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89f47434
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89f47434
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89f47434

Branch: refs/heads/master
Commit: 89f47434e2a6c2f8b80c44d08f866d3a8b8e85c3
Parents: e31c8ff
Author: Cheng Lian <li...@gmail.com>
Authored: Fri Apr 18 10:02:27 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Fri Apr 18 10:02:27 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/basicOperators.scala    | 21 +++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/89f47434/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index ab2e624..eedcc7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution}
 import org.apache.spark.util.MutablePair
 
-
 case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
   override def output = projectList.map(_.toAttribute)
 
@@ -143,8 +142,24 @@ object ExistingRdd {
   }
 
   def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
-    // TODO: Reuse the row, don't use map on the product iterator.  Maybe code gen?
-    data.map(r => new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row)
+    data.mapPartitions { iterator =>
+      if (iterator.isEmpty) {
+        Iterator.empty
+      } else {
+        val bufferedIterator = iterator.buffered
+        val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)
+
+        bufferedIterator.map { r =>
+          var i = 0
+          while (i < mutableRow.length) {
+            mutableRow(i) = r.productElement(i)
+            i += 1
+          }
+
+          mutableRow
+        }
+      }
+    }
   }
 
   def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {