You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/07/01 10:02:39 UTC
spark git commit: [SQL] [MINOR] remove internalRowRDD in DataFrame
Repository: spark
Updated Branches:
refs/heads/master fc3a6fe67 -> 0eee06158
[SQL] [MINOR] remove internalRowRDD in DataFrame
Developers have already familiar with `queryExecution.toRDD` as internal row RDD, and we should not add new concept.
Author: Wenchen Fan <cl...@outlook.com>
Closes #7116 from cloud-fan/internal-rdd and squashes the following commits:
24756ca [Wenchen Fan] remove internalRowRDD
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0eee0615
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0eee0615
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0eee0615
Branch: refs/heads/master
Commit: 0eee0615894cda8ae1b2c8e61b8bda0ff648a219
Parents: fc3a6fe
Author: Wenchen Fan <cl...@outlook.com>
Authored: Wed Jul 1 01:02:33 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Jul 1 01:02:33 2015 -0700
----------------------------------------------------------------------
sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 4 +---
.../org/apache/spark/sql/execution/stat/FrequentItems.scala | 2 +-
.../org/apache/spark/sql/execution/stat/StatFunctions.scala | 2 +-
.../src/main/scala/org/apache/spark/sql/sources/commands.scala | 4 ++--
4 files changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0eee0615/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 8fe1f7e..caad2da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1469,14 +1469,12 @@ class DataFrame private[sql](
lazy val rdd: RDD[Row] = {
// use a local variable to make sure the map closure doesn't capture the whole DataFrame
val schema = this.schema
- internalRowRdd.mapPartitions { rows =>
+ queryExecution.toRdd.mapPartitions { rows =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row])
}
}
- private[sql] def internalRowRdd = queryExecution.executedPlan.execute()
-
/**
* Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
* @group rdd
http://git-wip-us.apache.org/repos/asf/spark/blob/0eee0615/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
index 3ebbf96..4e2e2c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
@@ -90,7 +90,7 @@ private[sql] object FrequentItems extends Logging {
(name, originalSchema.fields(index).dataType)
}
- val freqItems = df.select(cols.map(Column(_)) : _*).internalRowRdd.aggregate(countMaps)(
+ val freqItems = df.select(cols.map(Column(_)) : _*).queryExecution.toRdd.aggregate(countMaps)(
seqOp = (counts, row) => {
var i = 0
while (i < numCols) {
http://git-wip-us.apache.org/repos/asf/spark/blob/0eee0615/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index b624ef7..23ddfa9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -82,7 +82,7 @@ private[sql] object StatFunctions extends Logging {
s"with dataType ${data.get.dataType} not supported.")
}
val columns = cols.map(n => Column(Cast(Column(n).expr, DoubleType)))
- df.select(columns: _*).internalRowRdd.aggregate(new CovarianceCounter)(
+ df.select(columns: _*).queryExecution.toRdd.aggregate(new CovarianceCounter)(
seqOp = (counter, row) => {
counter.add(row.getDouble(0), row.getDouble(1))
},
http://git-wip-us.apache.org/repos/asf/spark/blob/0eee0615/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 42b51ca..7214eb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -154,7 +154,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
writerContainer.driverSideSetup()
try {
- df.sqlContext.sparkContext.runJob(df.internalRowRdd, writeRows _)
+ df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
writerContainer.commitJob()
relation.refresh()
} catch { case cause: Throwable =>
@@ -220,7 +220,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
writerContainer.driverSideSetup()
try {
- df.sqlContext.sparkContext.runJob(df.internalRowRdd, writeRows _)
+ df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
writerContainer.commitJob()
relation.refresh()
} catch { case cause: Throwable =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org