You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/11/24 00:15:39 UTC

spark git commit: [SPARK-18510][SQL] Follow up to address comments in #15951

Repository: spark
Updated Branches:
  refs/heads/master 0d1bf2b6c -> 223fa218e


[SPARK-18510][SQL] Follow up to address comments in #15951

## What changes were proposed in this pull request?

This PR addressed the rest comments in #15951.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <sh...@databricks.com>

Closes #15997 from zsxwing/SPARK-18510-follow-up.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/223fa218
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/223fa218
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/223fa218

Branch: refs/heads/master
Commit: 223fa218e1f637f0d62332785a3bee225b65b990
Parents: 0d1bf2b
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Nov 23 16:15:35 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Nov 23 16:15:35 2016 -0800

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 35 +++++++++++---------
 1 file changed, 20 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/223fa218/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index dbc3e71..ccfc759 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -118,8 +118,10 @@ case class DataSource(
   private def getOrInferFileFormatSchema(
       format: FileFormat,
       justPartitioning: Boolean = false): (StructType, StructType) = {
-    // the operations below are expensive therefore try not to do them if we don't need to
-    lazy val tempFileCatalog = {
+    // the operations below are expensive therefore try not to do them if we don't need to, e.g.,
+    // in streaming mode, we have already inferred and registered partition columns, we will
+    // never have to materialize the lazy val below
+    lazy val tempFileIndex = {
       val allPaths = caseInsensitiveOptions.get("path") ++ paths
       val hadoopConf = sparkSession.sessionState.newHadoopConf()
       val globbedPaths = allPaths.toSeq.flatMap { path =>
@@ -133,7 +135,7 @@ case class DataSource(
     val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) {
       // Try to infer partitioning, because no DataSource in the read path provides the partitioning
       // columns properly unless it is a Hive DataSource
-      val resolved = tempFileCatalog.partitionSchema.map { partitionField =>
+      val resolved = tempFileIndex.partitionSchema.map { partitionField =>
         val equality = sparkSession.sessionState.conf.resolver
         // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
         userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
@@ -141,17 +143,17 @@ case class DataSource(
       }
       StructType(resolved)
     } else {
-      // in streaming mode, we have already inferred and registered partition columns, we will
-      // never have to materialize the lazy val below
-      lazy val inferredPartitions = tempFileCatalog.partitionSchema
       // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
       // partitioning
       if (userSpecifiedSchema.isEmpty) {
+        val inferredPartitions = tempFileIndex.partitionSchema
         inferredPartitions
       } else {
         val partitionFields = partitionColumns.map { partitionColumn =>
-          userSpecifiedSchema.flatMap(_.find(_.name == partitionColumn)).orElse {
-            val inferredOpt = inferredPartitions.find(_.name == partitionColumn)
+          val equality = sparkSession.sessionState.conf.resolver
+          userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
+            val inferredPartitions = tempFileIndex.partitionSchema
+            val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
             if (inferredOpt.isDefined) {
               logDebug(
                 s"""Type of partition column: $partitionColumn not found in specified schema
@@ -163,7 +165,7 @@ case class DataSource(
                    |Falling back to inferred dataType if it exists.
                  """.stripMargin)
             }
-            inferredPartitions.find(_.name == partitionColumn)
+            inferredOpt
           }.getOrElse {
             throw new AnalysisException(s"Failed to resolve the schema for $format for " +
               s"the partition column: $partitionColumn. It must be specified manually.")
@@ -182,7 +184,7 @@ case class DataSource(
       format.inferSchema(
         sparkSession,
         caseInsensitiveOptions,
-        tempFileCatalog.allFiles())
+        tempFileIndex.allFiles())
     }.getOrElse {
       throw new AnalysisException(
         s"Unable to infer schema for $format. It must be specified manually.")
@@ -224,8 +226,11 @@ case class DataSource(
               "you may be able to create a static DataFrame on that directory with " +
               "'spark.read.load(directory)' and infer schema from it.")
         }
-        val (schema, partCols) = getOrInferFileFormatSchema(format)
-        SourceInfo(s"FileSource[$path]", StructType(schema ++ partCols), partCols.fieldNames)
+        val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
+        SourceInfo(
+          s"FileSource[$path]",
+          StructType(dataSchema ++ partitionSchema),
+          partitionSchema.fieldNames)
 
       case _ =>
         throw new UnsupportedOperationException(
@@ -379,7 +384,7 @@ case class DataSource(
           globPath
         }.toArray
 
-        val (dataSchema, inferredPartitionSchema) = getOrInferFileFormatSchema(format)
+        val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
 
         val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
             catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
@@ -388,12 +393,12 @@ case class DataSource(
             catalogTable.get,
             catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
         } else {
-          new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(inferredPartitionSchema))
+          new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
         }
 
         HadoopFsRelation(
           fileCatalog,
-          partitionSchema = inferredPartitionSchema,
+          partitionSchema = partitionSchema,
           dataSchema = dataSchema.asNullable,
           bucketSpec = bucketSpec,
           format,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org