You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/14 00:40:34 UTC

[spark] branch branch-3.1 updated: [SPARK-34075][SQL][CORE] Hidden directories are being listed for partition inference

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 79cef81  [SPARK-34075][SQL][CORE] Hidden directories are being listed for partition inference
79cef81 is described below

commit 79cef81d35e60e8f2f107947c024b5be8e3f9d4c
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Thu Jan 14 09:39:38 2021 +0900

    [SPARK-34075][SQL][CORE] Hidden directories are being listed for partition inference
    
    ### What changes were proposed in this pull request?
    
    Fix a regression from https://github.com/apache/spark/pull/29959.
    
    In Spark, the following file paths are considered as hidden paths and they are ignored on file reads:
    1. starts with "_" and doesn't contain "="
    2. starts with "."
    
    However, after the refactoring PR https://github.com/apache/spark/pull/29959, the hidden paths are not filtered out on partition inference: https://github.com/apache/spark/pull/29959/files#r556432426
    
    This PR is to fix the bug. To archive the goal, the method `InMemoryFileIndex.shouldFilterOut` is refactored as `HadoopFSUtils.shouldFilterOutPathName`
    
    ### Why are the changes needed?
    
    Bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it fixes a bug for reading file paths with partitions.
    
    ### How was this patch tested?
    
    Unit test
    
    Closes #31169 from gengliangwang/fileListingBug.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 467d7589737d2430d09f1ffbd33bf801d179f990)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../org/apache/spark/util/HadoopFSUtils.scala      | 19 ++++++++++++-
 .../org/apache/spark/util/HadoopFSUtilsSuite.scala | 33 ++++++++++++++++++++++
 .../sql/execution/datasources/DataSource.scala     |  4 +--
 .../execution/datasources/InMemoryFileIndex.scala  | 15 +---------
 .../sql/execution/datasources/FileIndexSuite.scala | 26 +++++++++--------
 5 files changed, 69 insertions(+), 28 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
index 4af48d5..60a73ad 100644
--- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
@@ -249,8 +249,11 @@ private[spark] object HadoopFSUtils extends Logging {
         Array.empty[FileStatus]
     }
 
+    val filteredStatuses =
+      statuses.filterNot(status => shouldFilterOutPathName(status.getPath.getName))
+
     val allLeafStatuses = {
-      val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
+      val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
       val nestedFiles: Seq[FileStatus] = contextOpt match {
         case Some(context) if dirs.size > parallelismThreshold =>
           parallelListLeafFilesInternal(
@@ -350,4 +353,18 @@ private[spark] object HadoopFSUtils extends Logging {
     modificationTime: Long,
     accessTime: Long,
     blockLocations: Array[SerializableBlockLocation])
+
+  /** Checks if we should filter out this path name. */
+  def shouldFilterOutPathName(pathName: String): Boolean = {
+    // We filter follow paths:
+    // 1. everything that starts with _ and ., except _common_metadata and _metadata
+    // because Parquet needs to find those metadata files from leaf files returned by this method.
+    // We should refactor this logic to not mix metadata files with data files.
+    // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
+    // should skip this file in case of double reading.
+    val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
+      pathName.startsWith(".") || pathName.endsWith("._COPYING_")
+    val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
+    exclude && !include
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/util/HadoopFSUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/HadoopFSUtilsSuite.scala
new file mode 100644
index 0000000..ba91eab
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/HadoopFSUtilsSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.SparkFunSuite
+
+class HadoopFSUtilsSuite extends SparkFunSuite {
+  test("HadoopFSUtils - file filtering") {
+    assert(!HadoopFSUtils.shouldFilterOutPathName("abcd"))
+    assert(HadoopFSUtils.shouldFilterOutPathName(".ab"))
+    assert(HadoopFSUtils.shouldFilterOutPathName("_cd"))
+    assert(!HadoopFSUtils.shouldFilterOutPathName("_metadata"))
+    assert(!HadoopFSUtils.shouldFilterOutPathName("_common_metadata"))
+    assert(HadoopFSUtils.shouldFilterOutPathName("_ab_metadata"))
+    assert(HadoopFSUtils.shouldFilterOutPathName("_cd_common_metadata"))
+    assert(HadoopFSUtils.shouldFilterOutPathName("a._COPYING_"))
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 4783789..e84f594 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
 import org.apache.spark.sql.util.SchemaUtils
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{HadoopFSUtils, ThreadUtils, Utils}
 
 /**
  * The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
@@ -811,7 +811,7 @@ object DataSource extends Logging {
     val allPaths = globbedPaths ++ nonGlobPaths
     if (checkFilesExist) {
       val (filteredOut, filteredIn) = allPaths.partition { path =>
-        InMemoryFileIndex.shouldFilterOut(path.getName)
+        HadoopFSUtils.shouldFilterOutPathName(path.getName)
       }
       if (filteredIn.isEmpty) {
         logWarning(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 2127595..6c3deee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -158,23 +158,10 @@ object InMemoryFileIndex extends Logging {
       parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
   }
 
-  /** Checks if we should filter out this path name. */
-  def shouldFilterOut(pathName: String): Boolean = {
-    // We filter follow paths:
-    // 1. everything that starts with _ and ., except _common_metadata and _metadata
-    // because Parquet needs to find those metadata files from leaf files returned by this method.
-    // We should refactor this logic to not mix metadata files with data files.
-    // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
-    // should skip this file in case of double reading.
-    val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
-      pathName.startsWith(".") || pathName.endsWith("._COPYING_")
-    val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
-    exclude && !include
-  }
 }
 
 private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable {
   override def accept(path: Path): Boolean = {
-    (filter == null || filter.accept(path)) && !InMemoryFileIndex.shouldFilterOut(path.getName)
+    (filter == null || filter.accept(path)) && !HadoopFSUtils.shouldFilterOutPathName(path.getName)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 02be8c9..fcaf8df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -297,17 +297,6 @@ class FileIndexSuite extends SharedSparkSession {
     }
   }
 
-  test("InMemoryFileIndex - file filtering") {
-    assert(!InMemoryFileIndex.shouldFilterOut("abcd"))
-    assert(InMemoryFileIndex.shouldFilterOut(".ab"))
-    assert(InMemoryFileIndex.shouldFilterOut("_cd"))
-    assert(!InMemoryFileIndex.shouldFilterOut("_metadata"))
-    assert(!InMemoryFileIndex.shouldFilterOut("_common_metadata"))
-    assert(InMemoryFileIndex.shouldFilterOut("_ab_metadata"))
-    assert(InMemoryFileIndex.shouldFilterOut("_cd_common_metadata"))
-    assert(InMemoryFileIndex.shouldFilterOut("a._COPYING_"))
-  }
-
   test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
     class MockCatalog(
       override val rootPaths: Seq[Path])
@@ -416,6 +405,21 @@ class FileIndexSuite extends SharedSparkSession {
     fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray)
   }
 
+  test("SPARK-34075: InMemoryFileIndex filters out hidden file on partition inference") {
+    withTempPath { path =>
+      spark
+        .range(2)
+        .select(col("id").as("p"), col("id"))
+        .write
+        .partitionBy("p")
+        .parquet(path.getAbsolutePath)
+      val targetPath = new File(path, "p=1")
+      val hiddenPath = new File(path, "_hidden_path")
+      targetPath.renameTo(hiddenPath)
+      assert(spark.read.parquet(path.getAbsolutePath).count() == 1L)
+    }
+  }
+
   test("SPARK-20367 - properly unescape column names in inferPartitioning") {
     withTempPath { path =>
       val colToUnescape = "Column/#%'?"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org