You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/06/02 21:09:49 UTC

git commit: [SPARK-1958] Calling .collect() on a SchemaRDD should call executeCollect() on the underlying query plan.

Repository: spark
Updated Branches:
  refs/heads/master 9a5d482e0 -> d000ca98a


[SPARK-1958] Calling .collect() on a SchemaRDD should call executeCollect() on the underlying query plan.

In cases like `Limit` and `TakeOrdered`, `executeCollect()` makes optimizations that `execute().collect()` will not.

Author: Cheng Lian <li...@gmail.com>

Closes #939 from liancheng/spark-1958 and squashes the following commits:

bdc4a14 [Cheng Lian] Copy rows to present immutable data to users
8250976 [Cheng Lian] Added return type explicitly for public API
192a25c [Cheng Lian] [SPARK-1958] Calling .collect() on a SchemaRDD should call executeCollect() on the underlying query plan.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d000ca98
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d000ca98
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d000ca98

Branch: refs/heads/master
Commit: d000ca98a80986ff5b13477547f1dcab7856ae63
Parents: 9a5d482
Author: Cheng Lian <li...@gmail.com>
Authored: Mon Jun 2 12:09:43 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Jun 2 12:09:43 2014 -0700

----------------------------------------------------------------------
 sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala   | 6 ++++++
 .../main/scala/org/apache/spark/sql/execution/SparkPlan.scala  | 2 +-
 .../scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala | 2 +-
 3 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d000ca98/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index e855f36..8855c4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -369,6 +369,12 @@ class SchemaRDD(
   }
 
   // =======================================================================
+  // Overriden RDD actions
+  // =======================================================================
+
+  override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()
+
+  // =======================================================================
   // Base RDD functions that do NOT change schema
   // =======================================================================
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d000ca98/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 235a9b1..4613df1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -49,7 +49,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
   /**
    * Runs this query returning the result as an array.
    */
-  def executeCollect(): Array[Row] = execute().collect()
+  def executeCollect(): Array[Row] = execute().map(_.copy()).collect()
 
   protected def buildRow(values: Seq[Any]): Row =
     new GenericRow(values.toArray)

http://git-wip-us.apache.org/repos/asf/spark/blob/d000ca98/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index f9731e8..b973ceb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -201,7 +201,7 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
   }
 
   test("insert (appending) to same table via Scala API") {
-    sql("INSERT INTO testsource SELECT * FROM testsource").collect()
+    sql("INSERT INTO testsource SELECT * FROM testsource")
     val double_rdd = sql("SELECT * FROM testsource").collect()
     assert(double_rdd != null)
     assert(double_rdd.size === 30)