You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2017/01/17 23:06:37 UTC

spark git commit: [SPARK-18917][SQL] Remove schema check in appending data

Repository: spark
Updated Branches:
  refs/heads/master fee20df14 -> 83dff87de


[SPARK-18917][SQL] Remove schema check in appending data

## What changes were proposed in this pull request?
In append mode, we check whether the schema of the write is compatible with the schema of the existing data. It can be a significant performance issue in cloud environment to find the existing schema for files. This patch removes the check.

Note that for catalog tables, we always do the check, as discussed in https://github.com/apache/spark/pull/16339#discussion_r96208357

## How was this patch tested?
N/A

Closes #16339.

Author: Reynold Xin <rx...@databricks.com>

Closes #16622 from rxin/SPARK-18917.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83dff87d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83dff87d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83dff87d

Branch: refs/heads/master
Commit: 83dff87dedd66fcad13f1b54899c1c56ab1536b6
Parents: fee20df
Author: Reynold Xin <rx...@databricks.com>
Authored: Tue Jan 17 15:06:28 2017 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Jan 17 15:06:28 2017 -0800

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 36 ++------------------
 1 file changed, 3 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/83dff87d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 29afe57..ecfcafe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -104,20 +104,12 @@ case class DataSource(
    *     dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use
    *     this information, therefore calls to this method should be very cheap, i.e. there won't
    *     be any further inference in any triggers.
-   *   4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the
-   *     existing table's partitioning scheme. This is achieved by not providing
-   *     `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early
-   *     exit, if we don't care about the schema of the original table.
    *
    * @param format the file format object for this DataSource
-   * @param justPartitioning Whether to exit early and provide just the schema partitioning.
    * @return A pair of the data schema (excluding partition columns) and the schema of the partition
-   *         columns. If `justPartitioning` is `true`, then the dataSchema will be provided as
-   *         `null`.
+   *         columns.
    */
-  private def getOrInferFileFormatSchema(
-      format: FileFormat,
-      justPartitioning: Boolean = false): (StructType, StructType) = {
+  private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = {
     // the operations below are expensive therefore try not to do them if we don't need to, e.g.,
     // in streaming mode, we have already inferred and registered partition columns, we will
     // never have to materialize the lazy val below
@@ -174,9 +166,7 @@ case class DataSource(
         StructType(partitionFields)
       }
     }
-    if (justPartitioning) {
-      return (null, partitionSchema)
-    }
+
     val dataSchema = userSpecifiedSchema.map { schema =>
       val equality = sparkSession.sessionState.conf.resolver
       StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
@@ -434,26 +424,6 @@ case class DataSource(
     val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
     PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
 
-    // If we are appending to a table that already exists, make sure the partitioning matches
-    // up.  If we fail to load the table for whatever reason, ignore the check.
-    if (mode == SaveMode.Append) {
-      val existingPartitionColumns = Try {
-        getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
-      }.getOrElse(Seq.empty[String])
-      // TODO: Case sensitivity.
-      val sameColumns =
-        existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
-      if (existingPartitionColumns.nonEmpty && !sameColumns) {
-        throw new AnalysisException(
-          s"""Requested partitioning does not match existing partitioning.
-             |Existing partitioning columns:
-             |  ${existingPartitionColumns.mkString(", ")}
-             |Requested partitioning columns:
-             |  ${partitionColumns.mkString(", ")}
-             |""".stripMargin)
-      }
-    }
-
     // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
     // not need to have the query as child, to avoid to analyze an optimized query,
     // because InsertIntoHadoopFsRelationCommand will be optimized first.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org