You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/03 07:54:08 UTC

spark git commit: [SPARK-18726][SQL] resolveRelation for FileFormat DataSource don't need to listFiles twice

Repository: spark
Updated Branches:
  refs/heads/master e24f21b5f -> 982f3223b


[SPARK-18726][SQL] resolveRelation for FileFormat DataSource don't need to listFiles twice

## What changes were proposed in this pull request?

Currently when we resolveRelation for a `FileFormat DataSource` without providing user schema, it will execute `listFiles`  twice in `InMemoryFileIndex` during `resolveRelation`.

This PR add a `FileStatusCache` for DataSource, this can avoid listFiles twice.

But there is a bug in `InMemoryFileIndex` see:
 [SPARK-19748](https://github.com/apache/spark/pull/17079)
 [SPARK-19761](https://github.com/apache/spark/pull/17093),
so this pr should be after SPARK-19748/ SPARK-19761.

## How was this patch tested?
unit test added

Author: windpiger <so...@outlook.com>

Closes #17081 from windpiger/resolveDataSourceScanFilesTwice.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/982f3223
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/982f3223
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/982f3223

Branch: refs/heads/master
Commit: 982f3223b4f55f988091402063fe8746c5e2cee4
Parents: e24f21b
Author: windpiger <so...@outlook.com>
Authored: Thu Mar 2 23:54:01 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Mar 2 23:54:01 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/datasources/DataSource.scala   | 13 +++++++++----
 .../sql/hive/PartitionedTablePerfStatsSuite.scala      | 11 +++++++++++
 2 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/982f3223/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index c1353d4..4947dfd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -106,10 +106,13 @@ case class DataSource(
    *     be any further inference in any triggers.
    *
    * @param format the file format object for this DataSource
+   * @param fileStatusCache the shared cache for file statuses to speed up listing
    * @return A pair of the data schema (excluding partition columns) and the schema of the partition
    *         columns.
    */
-  private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = {
+  private def getOrInferFileFormatSchema(
+      format: FileFormat,
+      fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {
     // the operations below are expensive therefore try not to do them if we don't need to, e.g.,
     // in streaming mode, we have already inferred and registered partition columns, we will
     // never have to materialize the lazy val below
@@ -122,7 +125,7 @@ case class DataSource(
         val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
         SparkHadoopUtil.get.globPathIfNecessary(qualified)
       }.toArray
-      new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
+      new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
     }
     val partitionSchema = if (partitionColumns.isEmpty) {
       // Try to infer partitioning, because no DataSource in the read path provides the partitioning
@@ -354,7 +357,8 @@ case class DataSource(
           globPath
         }.toArray
 
-        val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
+        val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+        val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)
 
         val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
             catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
@@ -364,7 +368,8 @@ case class DataSource(
             catalogTable.get,
             catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
         } else {
-          new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
+          new InMemoryFileIndex(
+            sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache)
         }
 
         HadoopFsRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/982f3223/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index b792a16..5050619 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -411,4 +411,15 @@ class PartitionedTablePerfStatsSuite
       }
     }
   }
+
+  test("resolveRelation for a FileFormat DataSource without userSchema scan filesystem only once") {
+    withTempDir { dir =>
+      import spark.implicits._
+      Seq(1).toDF("a").write.mode("overwrite").save(dir.getAbsolutePath)
+      HiveCatalogMetrics.reset()
+      spark.read.parquet(dir.getAbsolutePath)
+      assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
+      assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org