You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/31 16:44:33 UTC

spark git commit: [SPARK-21886][SQL] Use SparkSession.internalCreateDataFrame to create…

Repository: spark
Updated Branches:
  refs/heads/master 19b0240d4 -> 9696580c3


[SPARK-21886][SQL] Use SparkSession.internalCreateDataFrame to create…

… Dataset with LogicalRDD logical operator

## What changes were proposed in this pull request?

Reusing `SparkSession.internalCreateDataFrame` wherever possible (to cut dups)

## How was this patch tested?

Local build and waiting for Jenkins

Author: Jacek Laskowski <ja...@japila.pl>

Closes #19095 from jaceklaskowski/SPARK-21886-internalCreateDataFrame.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9696580c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9696580c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9696580c

Branch: refs/heads/master
Commit: 9696580c33c68c3de32694fbefb93c509d525d94
Parents: 19b0240
Author: Jacek Laskowski <ja...@japila.pl>
Authored: Thu Aug 31 09:44:29 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Aug 31 09:44:29 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameReader.scala      | 10 ++--------
 .../scala/org/apache/spark/sql/SparkSession.scala   | 16 ++++++++--------
 2 files changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9696580c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 8209cec..4f375e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -410,10 +410,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         parsedOptions.columnNameOfCorruptRecord)
       iter.flatMap(parser.parse)
     }
-
-    Dataset.ofRows(
-      sparkSession,
-      LogicalRDD(schema.toAttributes, parsed, isStreaming = jsonDataset.isStreaming)(sparkSession))
+    sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
   }
 
   /**
@@ -473,10 +470,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         parsedOptions.columnNameOfCorruptRecord)
       iter.flatMap(parser.parse)
     }
-
-    Dataset.ofRows(
-      sparkSession,
-      LogicalRDD(schema.toAttributes, parsed, isStreaming = csvDataset.isStreaming)(sparkSession))
+    sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9696580c/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 863c316..d5ab53a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -559,8 +559,7 @@ class SparkSession private(
   }
 
   /**
-   * Creates a `DataFrame` from an RDD[Row].
-   * User can specify whether the input rows should be converted to Catalyst rows.
+   * Creates a `DataFrame` from an `RDD[InternalRow]`.
    */
   private[sql] def internalCreateDataFrame(
       catalystRows: RDD[InternalRow],
@@ -576,7 +575,7 @@ class SparkSession private(
   }
 
   /**
-   * Creates a `DataFrame` from an RDD[Row].
+   * Creates a `DataFrame` from an `RDD[Row]`.
    * User can specify whether the input rows should be converted to Catalyst rows.
    */
   private[sql] def createDataFrame(
@@ -589,10 +588,9 @@ class SparkSession private(
       val encoder = RowEncoder(schema)
       rowRDD.map(encoder.toRow)
     } else {
-      rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
+      rowRDD.map { r: Row => InternalRow.fromSeq(r.toSeq) }
     }
-    val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
-    Dataset.ofRows(self, logicalPlan)
+    internalCreateDataFrame(catalystRows, schema)
   }
 
 
@@ -737,13 +735,15 @@ class SparkSession private(
   }
 
   /**
-   * Apply a schema defined by the schema to an RDD. It is only used by PySpark.
+   * Apply `schema` to an RDD.
+   *
+   * @note Used by PySpark only
    */
   private[sql] def applySchemaToPythonRDD(
       rdd: RDD[Array[Any]],
       schema: StructType): DataFrame = {
     val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow])
-    Dataset.ofRows(self, LogicalRDD(schema.toAttributes, rowRdd)(self))
+    internalCreateDataFrame(rowRdd, schema)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org