You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/07/01 10:02:39 UTC

spark git commit: [SQL] [MINOR] remove internalRowRDD in DataFrame

Repository: spark
Updated Branches:
  refs/heads/master fc3a6fe67 -> 0eee06158


[SQL] [MINOR] remove internalRowRDD in DataFrame

Developers have already familiar with `queryExecution.toRDD` as internal row RDD, and we should not add new concept.

Author: Wenchen Fan <cl...@outlook.com>

Closes #7116 from cloud-fan/internal-rdd and squashes the following commits:

24756ca [Wenchen Fan] remove internalRowRDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0eee0615
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0eee0615
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0eee0615

Branch: refs/heads/master
Commit: 0eee0615894cda8ae1b2c8e61b8bda0ff648a219
Parents: fc3a6fe
Author: Wenchen Fan <cl...@outlook.com>
Authored: Wed Jul 1 01:02:33 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Jul 1 01:02:33 2015 -0700

----------------------------------------------------------------------
 sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala     | 4 +---
 .../org/apache/spark/sql/execution/stat/FrequentItems.scala      | 2 +-
 .../org/apache/spark/sql/execution/stat/StatFunctions.scala      | 2 +-
 .../src/main/scala/org/apache/spark/sql/sources/commands.scala   | 4 ++--
 4 files changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0eee0615/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 8fe1f7e..caad2da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1469,14 +1469,12 @@ class DataFrame private[sql](
   lazy val rdd: RDD[Row] = {
     // use a local variable to make sure the map closure doesn't capture the whole DataFrame
     val schema = this.schema
-    internalRowRdd.mapPartitions { rows =>
+    queryExecution.toRdd.mapPartitions { rows =>
       val converter = CatalystTypeConverters.createToScalaConverter(schema)
       rows.map(converter(_).asInstanceOf[Row])
     }
   }
 
-  private[sql] def internalRowRdd = queryExecution.executedPlan.execute()
-
   /**
    * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
    * @group rdd

http://git-wip-us.apache.org/repos/asf/spark/blob/0eee0615/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
index 3ebbf96..4e2e2c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
@@ -90,7 +90,7 @@ private[sql] object FrequentItems extends Logging {
       (name, originalSchema.fields(index).dataType)
     }
 
-    val freqItems = df.select(cols.map(Column(_)) : _*).internalRowRdd.aggregate(countMaps)(
+    val freqItems = df.select(cols.map(Column(_)) : _*).queryExecution.toRdd.aggregate(countMaps)(
       seqOp = (counts, row) => {
         var i = 0
         while (i < numCols) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0eee0615/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index b624ef7..23ddfa9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -82,7 +82,7 @@ private[sql] object StatFunctions extends Logging {
         s"with dataType ${data.get.dataType} not supported.")
     }
     val columns = cols.map(n => Column(Cast(Column(n).expr, DoubleType)))
-    df.select(columns: _*).internalRowRdd.aggregate(new CovarianceCounter)(
+    df.select(columns: _*).queryExecution.toRdd.aggregate(new CovarianceCounter)(
       seqOp = (counter, row) => {
         counter.add(row.getDouble(0), row.getDouble(1))
       },

http://git-wip-us.apache.org/repos/asf/spark/blob/0eee0615/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 42b51ca..7214eb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -154,7 +154,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
     writerContainer.driverSideSetup()
 
     try {
-      df.sqlContext.sparkContext.runJob(df.internalRowRdd, writeRows _)
+      df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
       writerContainer.commitJob()
       relation.refresh()
     } catch { case cause: Throwable =>
@@ -220,7 +220,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
     writerContainer.driverSideSetup()
 
     try {
-      df.sqlContext.sparkContext.runJob(df.internalRowRdd, writeRows _)
+      df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
       writerContainer.commitJob()
       relation.refresh()
     } catch { case cause: Throwable =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org