You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/31 16:44:33 UTC
spark git commit: [SPARK-21886][SQL] Use SparkSession.internalCreateDataFrame to create…
Repository: spark
Updated Branches:
refs/heads/master 19b0240d4 -> 9696580c3
[SPARK-21886][SQL] Use SparkSession.internalCreateDataFrame to create…
… Dataset with LogicalRDD logical operator
## What changes were proposed in this pull request?
Reusing `SparkSession.internalCreateDataFrame` wherever possible (to cut dups)
## How was this patch tested?
Local build and waiting for Jenkins
Author: Jacek Laskowski <ja...@japila.pl>
Closes #19095 from jaceklaskowski/SPARK-21886-internalCreateDataFrame.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9696580c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9696580c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9696580c
Branch: refs/heads/master
Commit: 9696580c33c68c3de32694fbefb93c509d525d94
Parents: 19b0240
Author: Jacek Laskowski <ja...@japila.pl>
Authored: Thu Aug 31 09:44:29 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Aug 31 09:44:29 2017 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/DataFrameReader.scala | 10 ++--------
.../scala/org/apache/spark/sql/SparkSession.scala | 16 ++++++++--------
2 files changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9696580c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 8209cec..4f375e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -410,10 +410,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
parsedOptions.columnNameOfCorruptRecord)
iter.flatMap(parser.parse)
}
-
- Dataset.ofRows(
- sparkSession,
- LogicalRDD(schema.toAttributes, parsed, isStreaming = jsonDataset.isStreaming)(sparkSession))
+ sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
}
/**
@@ -473,10 +470,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
parsedOptions.columnNameOfCorruptRecord)
iter.flatMap(parser.parse)
}
-
- Dataset.ofRows(
- sparkSession,
- LogicalRDD(schema.toAttributes, parsed, isStreaming = csvDataset.isStreaming)(sparkSession))
+ sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/9696580c/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 863c316..d5ab53a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -559,8 +559,7 @@ class SparkSession private(
}
/**
- * Creates a `DataFrame` from an RDD[Row].
- * User can specify whether the input rows should be converted to Catalyst rows.
+ * Creates a `DataFrame` from an `RDD[InternalRow]`.
*/
private[sql] def internalCreateDataFrame(
catalystRows: RDD[InternalRow],
@@ -576,7 +575,7 @@ class SparkSession private(
}
/**
- * Creates a `DataFrame` from an RDD[Row].
+ * Creates a `DataFrame` from an `RDD[Row]`.
* User can specify whether the input rows should be converted to Catalyst rows.
*/
private[sql] def createDataFrame(
@@ -589,10 +588,9 @@ class SparkSession private(
val encoder = RowEncoder(schema)
rowRDD.map(encoder.toRow)
} else {
- rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
+ rowRDD.map { r: Row => InternalRow.fromSeq(r.toSeq) }
}
- val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
- Dataset.ofRows(self, logicalPlan)
+ internalCreateDataFrame(catalystRows, schema)
}
@@ -737,13 +735,15 @@ class SparkSession private(
}
/**
- * Apply a schema defined by the schema to an RDD. It is only used by PySpark.
+ * Apply `schema` to an RDD.
+ *
+ * @note Used by PySpark only
*/
private[sql] def applySchemaToPythonRDD(
rdd: RDD[Array[Any]],
schema: StructType): DataFrame = {
val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow])
- Dataset.ofRows(self, LogicalRDD(schema.toAttributes, rowRdd)(self))
+ internalCreateDataFrame(rowRdd, schema)
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org