You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/03/21 02:13:48 UTC

[spark] branch master updated: [SPARK-27223][SQL] Remove private methods that skip conversion when passing user schemas for constructing a DataFrame

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e090ba  [SPARK-27223][SQL] Remove private methods that skip conversion when passing user schemas for constructing a DataFrame
2e090ba is described below

commit 2e090ba628d4622f28e934f602bf8bc9783924c8
Author: maryannxue <ma...@apache.org>
AuthorDate: Thu Mar 21 11:13:25 2019 +0900

    [SPARK-27223][SQL] Remove private methods that skip conversion when passing user schemas for constructing a DataFrame
    
    ## What changes were proposed in this pull request?
    
    When passing in a user schema to create a DataFrame, there might be mismatched nullability between the user schema and the the actual data. All related public interfaces now perform catalyst conversion using the user provided schema, which catches such mismatches to avoid runtime errors later on. However, there're private methods which allow this conversion to be skipped, so we need to remove these private methods which may lead to confusion and potential issues.
    
    ## How was this patch tested?
    
    Passed existing tests. No new tests were added since this PR removed the private interfaces that would potentially cause null problems and other interfaces are covered already by existing tests.
    
    Closes #24162 from maryannxue/spark-27223.
    
    Authored-by: maryannxue <ma...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../scala/org/apache/spark/sql/SQLContext.scala    |  9 --------
 .../scala/org/apache/spark/sql/SparkSession.scala  | 25 +++++-----------------
 .../org/apache/spark/sql/DataFrameSuite.scala      |  6 +++---
 3 files changed, 8 insertions(+), 32 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 43f34e6..08b7521 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -325,15 +325,6 @@ class SQLContext private[sql](val sparkSession: SparkSession)
   }
 
   /**
-   * Creates a DataFrame from an RDD[Row]. User can specify whether the input rows should be
-   * converted to Catalyst rows.
-   */
-  private[sql]
-  def createDataFrame(rowRDD: RDD[Row], schema: StructType, needsConversion: Boolean) = {
-    sparkSession.createDataFrame(rowRDD, schema, needsConversion)
-  }
-
-  /**
    * :: Experimental ::
    * Creates a [[Dataset]] from a local Seq of data of a given type. This method requires an
    * encoder (to convert a JVM object of type `T` to and from the internal Spark SQL representation)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 5208b9a..0b5bf3f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -361,7 +361,11 @@ class SparkSession private(
   @DeveloperApi
   @Evolving
   def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
-    createDataFrame(rowRDD, schema, needsConversion = true)
+    // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
+    // schema differs from the existing schema on any field data type.
+    val encoder = RowEncoder(schema)
+    val catalystRows = rowRDD.map(encoder.toRow)
+    internalCreateDataFrame(catalystRows.setName(rowRDD.name), schema)
   }
 
   /**
@@ -590,25 +594,6 @@ class SparkSession private(
     Dataset.ofRows(self, logicalPlan)
   }
 
-  /**
-   * Creates a `DataFrame` from an `RDD[Row]`.
-   * User can specify whether the input rows should be converted to Catalyst rows.
-   */
-  private[sql] def createDataFrame(
-      rowRDD: RDD[Row],
-      schema: StructType,
-      needsConversion: Boolean) = {
-    // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
-    // schema differs from the existing schema on any field data type.
-    val catalystRows = if (needsConversion) {
-      val encoder = RowEncoder(schema)
-      rowRDD.map(encoder.toRow)
-    } else {
-      rowRDD.map { r: Row => InternalRow.fromSeq(r.toSeq) }
-    }
-    internalCreateDataFrame(catalystRows.setName(rowRDD.name), schema)
-  }
-
 
   /* ------------------------- *
    |  Catalog-related methods  |
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 6dec129..78decd4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1572,8 +1572,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
       val rdd = sparkContext.makeRDD(Seq(Row(1, 3), Row(2, 1)))
       val df = spark.createDataFrame(
         rdd,
-        new StructType().add("f1", IntegerType).add("f2", IntegerType),
-        needsConversion = false).select($"F1", $"f2".as("f2"))
+        new StructType().add("f1", IntegerType).add("f2", IntegerType))
+        .select($"F1", $"f2".as("f2"))
       val df1 = df.as("a")
       val df2 = df.as("b")
       checkAnswer(df1.join(df2, $"a.f2" === $"b.f2"), Row(1, 3, 1, 3) :: Row(2, 1, 2, 1) :: Nil)
@@ -1774,7 +1774,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
     val size = 201L
     val rdd = sparkContext.makeRDD(Seq(Row.fromSeq(Seq.range(0, size))))
     val schemas = List.range(0, size).map(a => StructField("name" + a, LongType, true))
-    val df = spark.createDataFrame(rdd, StructType(schemas), false)
+    val df = spark.createDataFrame(rdd, StructType(schemas))
     assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org