You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/11/24 00:15:39 UTC
spark git commit: [SPARK-18510][SQL] Follow up to address comments in
#15951
Repository: spark
Updated Branches:
refs/heads/master 0d1bf2b6c -> 223fa218e
[SPARK-18510][SQL] Follow up to address comments in #15951
## What changes were proposed in this pull request?
This PR addressed the rest comments in #15951.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <sh...@databricks.com>
Closes #15997 from zsxwing/SPARK-18510-follow-up.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/223fa218
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/223fa218
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/223fa218
Branch: refs/heads/master
Commit: 223fa218e1f637f0d62332785a3bee225b65b990
Parents: 0d1bf2b
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Nov 23 16:15:35 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Nov 23 16:15:35 2016 -0800
----------------------------------------------------------------------
.../sql/execution/datasources/DataSource.scala | 35 +++++++++++---------
1 file changed, 20 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/223fa218/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index dbc3e71..ccfc759 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -118,8 +118,10 @@ case class DataSource(
private def getOrInferFileFormatSchema(
format: FileFormat,
justPartitioning: Boolean = false): (StructType, StructType) = {
- // the operations below are expensive therefore try not to do them if we don't need to
- lazy val tempFileCatalog = {
+ // the operations below are expensive therefore try not to do them if we don't need to, e.g.,
+ // in streaming mode, we have already inferred and registered partition columns, we will
+ // never have to materialize the lazy val below
+ lazy val tempFileIndex = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.toSeq.flatMap { path =>
@@ -133,7 +135,7 @@ case class DataSource(
val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource
- val resolved = tempFileCatalog.partitionSchema.map { partitionField =>
+ val resolved = tempFileIndex.partitionSchema.map { partitionField =>
val equality = sparkSession.sessionState.conf.resolver
// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
@@ -141,17 +143,17 @@ case class DataSource(
}
StructType(resolved)
} else {
- // in streaming mode, we have already inferred and registered partition columns, we will
- // never have to materialize the lazy val below
- lazy val inferredPartitions = tempFileCatalog.partitionSchema
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
// partitioning
if (userSpecifiedSchema.isEmpty) {
+ val inferredPartitions = tempFileIndex.partitionSchema
inferredPartitions
} else {
val partitionFields = partitionColumns.map { partitionColumn =>
- userSpecifiedSchema.flatMap(_.find(_.name == partitionColumn)).orElse {
- val inferredOpt = inferredPartitions.find(_.name == partitionColumn)
+ val equality = sparkSession.sessionState.conf.resolver
+ userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
+ val inferredPartitions = tempFileIndex.partitionSchema
+ val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
if (inferredOpt.isDefined) {
logDebug(
s"""Type of partition column: $partitionColumn not found in specified schema
@@ -163,7 +165,7 @@ case class DataSource(
|Falling back to inferred dataType if it exists.
""".stripMargin)
}
- inferredPartitions.find(_.name == partitionColumn)
+ inferredOpt
}.getOrElse {
throw new AnalysisException(s"Failed to resolve the schema for $format for " +
s"the partition column: $partitionColumn. It must be specified manually.")
@@ -182,7 +184,7 @@ case class DataSource(
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
- tempFileCatalog.allFiles())
+ tempFileIndex.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format. It must be specified manually.")
@@ -224,8 +226,11 @@ case class DataSource(
"you may be able to create a static DataFrame on that directory with " +
"'spark.read.load(directory)' and infer schema from it.")
}
- val (schema, partCols) = getOrInferFileFormatSchema(format)
- SourceInfo(s"FileSource[$path]", StructType(schema ++ partCols), partCols.fieldNames)
+ val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
+ SourceInfo(
+ s"FileSource[$path]",
+ StructType(dataSchema ++ partitionSchema),
+ partitionSchema.fieldNames)
case _ =>
throw new UnsupportedOperationException(
@@ -379,7 +384,7 @@ case class DataSource(
globPath
}.toArray
- val (dataSchema, inferredPartitionSchema) = getOrInferFileFormatSchema(format)
+ val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
@@ -388,12 +393,12 @@ case class DataSource(
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
} else {
- new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(inferredPartitionSchema))
+ new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
}
HadoopFsRelation(
fileCatalog,
- partitionSchema = inferredPartitionSchema,
+ partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org