You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/15 07:42:09 UTC

spark git commit: [SPARK-13207][SQL][BRANCH-1.6] Make partitioning discovery ignore _SUCCESS files.

Repository: spark
Updated Branches:
  refs/heads/branch-1.6 589d0420a -> 6935b5080


[SPARK-13207][SQL][BRANCH-1.6] Make partitioning discovery ignore _SUCCESS files.

If a _SUCCESS appears in the inner partitioning dir, partition discovery will treat that _SUCCESS file as a data file. Then, partition discovery will fail because it finds that the dir structure is not valid. We should ignore those `_SUCCESS` files.

In future, it is better to ignore all files/dirs starting with `_` or `.`. This PR does not make this change. I am thinking about making this change simple, so we can consider of getting it in branch 1.6.

To ignore all files/dirs starting with `_` or `, the main change is to let ParquetRelation have another way to get metadata files. Right now, it relies on FileStatusCache's cachedLeafStatuses, which returns file statuses of both metadata files (e.g. metadata files used by parquet) and data files, which requires more changes.

https://issues.apache.org/jira/browse/SPARK-13207

Author: Yin Huai <yh...@databricks.com>

Closes #11697 from yhuai/SPARK13207_branch16.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6935b508
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6935b508
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6935b508

Branch: refs/heads/branch-1.6
Commit: 6935b50801cb10e9e5b7583931cdc972e8981793
Parents: 589d042
Author: Yin Huai <yh...@databricks.com>
Authored: Mon Mar 14 23:42:05 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Mar 14 23:42:05 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/sources/interfaces.scala   | 30 ++++++++++++++------
 .../ParquetPartitionDiscoverySuite.scala        | 23 +++++++++++++++
 2 files changed, 44 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6935b508/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 736517b..ce5f3dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -459,7 +459,7 @@ abstract class HadoopFsRelation private[sql](
           }
         }.filterNot { status =>
           val name = status.getPath.getName
-          name.toLowerCase == "_temporary" || name.startsWith(".")
+          HadoopFsRelation.shouldFilterOut(name)
         }
 
         val (dirs, files) = statuses.partition(_.isDir)
@@ -843,6 +843,16 @@ abstract class HadoopFsRelation private[sql](
 }
 
 private[sql] object HadoopFsRelation extends Logging {
+
+  /** Checks if we should filter out this path name. */
+  def shouldFilterOut(pathName: String): Boolean = {
+    // TODO: We should try to filter out all files/dirs starting with "." or "_".
+    // The only reason that we are not doing it now is that Parquet needs to find those
+    // metadata files from leaf files returned by this methods. We should refactor
+    // this logic to not mix metadata files with data files.
+    pathName == "_SUCCESS" || pathName == "_temporary" || pathName.startsWith(".")
+  }
+
   // We don't filter files/directories whose name start with "_" except "_temporary" here, as
   // specific data sources may take advantages over them (e.g. Parquet _metadata and
   // _common_metadata files). "_temporary" directories are explicitly ignored since failed
@@ -851,19 +861,21 @@ private[sql] object HadoopFsRelation extends Logging {
   def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = {
     logInfo(s"Listing ${status.getPath}")
     val name = status.getPath.getName.toLowerCase
-    if (name == "_temporary" || name.startsWith(".")) {
+    if (shouldFilterOut(name)) {
       Array.empty
     } else {
       // Dummy jobconf to get to the pathFilter defined in configuration
       val jobConf = new JobConf(fs.getConf, this.getClass())
       val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
-      if (pathFilter != null) {
-        val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir)
-        files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
-      } else {
-        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
-        files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
-      }
+      val statuses =
+        if (pathFilter != null) {
+          val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir)
+          files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+        } else {
+          val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+          files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+        }
+      statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6935b508/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 71e9034..6dc855c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -704,6 +704,29 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
     }
   }
 
+  test("_SUCCESS should not break partitioning discovery") {
+    Seq(1, 32).foreach { threshold =>
+      // We have two paths to list files, one at driver side, another one that we use
+      // a Spark job. We need to test both ways.
+      withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> threshold.toString) {
+        withTempPath { dir =>
+          val tablePath = new File(dir, "table")
+          val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+          df.write
+            .format("parquet")
+            .partitionBy("b", "c", "d")
+            .save(tablePath.getCanonicalPath)
+
+          Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1", "_SUCCESS"))
+          Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1/c=1", "_SUCCESS"))
+          Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1/c=1/d=1", "_SUCCESS"))
+          checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df)
+        }
+      }
+    }
+  }
+
   test("listConflictingPartitionColumns") {
     def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
       val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org