You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/03 07:54:08 UTC
spark git commit: [SPARK-18726][SQL] resolveRelation for FileFormat
DataSource don't need to listFiles twice
Repository: spark
Updated Branches:
refs/heads/master e24f21b5f -> 982f3223b
[SPARK-18726][SQL] resolveRelation for FileFormat DataSource don't need to listFiles twice
## What changes were proposed in this pull request?
Currently when we resolveRelation for a `FileFormat DataSource` without providing user schema, it will execute `listFiles` twice in `InMemoryFileIndex` during `resolveRelation`.
This PR add a `FileStatusCache` for DataSource, this can avoid listFiles twice.
But there is a bug in `InMemoryFileIndex` see:
[SPARK-19748](https://github.com/apache/spark/pull/17079)
[SPARK-19761](https://github.com/apache/spark/pull/17093),
so this pr should be after SPARK-19748/ SPARK-19761.
## How was this patch tested?
unit test added
Author: windpiger <so...@outlook.com>
Closes #17081 from windpiger/resolveDataSourceScanFilesTwice.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/982f3223
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/982f3223
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/982f3223
Branch: refs/heads/master
Commit: 982f3223b4f55f988091402063fe8746c5e2cee4
Parents: e24f21b
Author: windpiger <so...@outlook.com>
Authored: Thu Mar 2 23:54:01 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Mar 2 23:54:01 2017 -0800
----------------------------------------------------------------------
.../spark/sql/execution/datasources/DataSource.scala | 13 +++++++++----
.../sql/hive/PartitionedTablePerfStatsSuite.scala | 11 +++++++++++
2 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/982f3223/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index c1353d4..4947dfd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -106,10 +106,13 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
+ * @param fileStatusCache the shared cache for file statuses to speed up listing
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
* columns.
*/
- private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = {
+ private def getOrInferFileFormatSchema(
+ format: FileFormat,
+ fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
@@ -122,7 +125,7 @@ case class DataSource(
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
- new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
+ new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
}
val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
@@ -354,7 +357,8 @@ case class DataSource(
globPath
}.toArray
- val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
+ val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+ val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
@@ -364,7 +368,8 @@ case class DataSource(
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
} else {
- new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
+ new InMemoryFileIndex(
+ sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache)
}
HadoopFsRelation(
http://git-wip-us.apache.org/repos/asf/spark/blob/982f3223/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index b792a16..5050619 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -411,4 +411,15 @@ class PartitionedTablePerfStatsSuite
}
}
}
+
+ test("resolveRelation for a FileFormat DataSource without userSchema scan filesystem only once") {
+ withTempDir { dir =>
+ import spark.implicits._
+ Seq(1).toDF("a").write.mode("overwrite").save(dir.getAbsolutePath)
+ HiveCatalogMetrics.reset()
+ spark.read.parquet(dir.getAbsolutePath)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org