You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/14 00:40:34 UTC
[spark] branch branch-3.1 updated: [SPARK-34075][SQL][CORE] Hidden
directories are being listed for partition inference
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 79cef81 [SPARK-34075][SQL][CORE] Hidden directories are being listed for partition inference
79cef81 is described below
commit 79cef81d35e60e8f2f107947c024b5be8e3f9d4c
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Thu Jan 14 09:39:38 2021 +0900
[SPARK-34075][SQL][CORE] Hidden directories are being listed for partition inference
### What changes were proposed in this pull request?
Fix a regression from https://github.com/apache/spark/pull/29959.
In Spark, the following file paths are considered as hidden paths and they are ignored on file reads:
1. starts with "_" and doesn't contain "="
2. starts with "."
However, after the refactoring PR https://github.com/apache/spark/pull/29959, the hidden paths are not filtered out on partition inference: https://github.com/apache/spark/pull/29959/files#r556432426
This PR is to fix the bug. To archive the goal, the method `InMemoryFileIndex.shouldFilterOut` is refactored as `HadoopFSUtils.shouldFilterOutPathName`
### Why are the changes needed?
Bugfix
### Does this PR introduce _any_ user-facing change?
Yes, it fixes a bug for reading file paths with partitions.
### How was this patch tested?
Unit test
Closes #31169 from gengliangwang/fileListingBug.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit 467d7589737d2430d09f1ffbd33bf801d179f990)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../org/apache/spark/util/HadoopFSUtils.scala | 19 ++++++++++++-
.../org/apache/spark/util/HadoopFSUtilsSuite.scala | 33 ++++++++++++++++++++++
.../sql/execution/datasources/DataSource.scala | 4 +--
.../execution/datasources/InMemoryFileIndex.scala | 15 +---------
.../sql/execution/datasources/FileIndexSuite.scala | 26 +++++++++--------
5 files changed, 69 insertions(+), 28 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
index 4af48d5..60a73ad 100644
--- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
@@ -249,8 +249,11 @@ private[spark] object HadoopFSUtils extends Logging {
Array.empty[FileStatus]
}
+ val filteredStatuses =
+ statuses.filterNot(status => shouldFilterOutPathName(status.getPath.getName))
+
val allLeafStatuses = {
- val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
+ val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = contextOpt match {
case Some(context) if dirs.size > parallelismThreshold =>
parallelListLeafFilesInternal(
@@ -350,4 +353,18 @@ private[spark] object HadoopFSUtils extends Logging {
modificationTime: Long,
accessTime: Long,
blockLocations: Array[SerializableBlockLocation])
+
+ /** Checks if we should filter out this path name. */
+ def shouldFilterOutPathName(pathName: String): Boolean = {
+ // We filter follow paths:
+ // 1. everything that starts with _ and ., except _common_metadata and _metadata
+ // because Parquet needs to find those metadata files from leaf files returned by this method.
+ // We should refactor this logic to not mix metadata files with data files.
+ // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
+ // should skip this file in case of double reading.
+ val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
+ pathName.startsWith(".") || pathName.endsWith("._COPYING_")
+ val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
+ exclude && !include
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/util/HadoopFSUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/HadoopFSUtilsSuite.scala
new file mode 100644
index 0000000..ba91eab
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/HadoopFSUtilsSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.SparkFunSuite
+
+class HadoopFSUtilsSuite extends SparkFunSuite {
+ test("HadoopFSUtils - file filtering") {
+ assert(!HadoopFSUtils.shouldFilterOutPathName("abcd"))
+ assert(HadoopFSUtils.shouldFilterOutPathName(".ab"))
+ assert(HadoopFSUtils.shouldFilterOutPathName("_cd"))
+ assert(!HadoopFSUtils.shouldFilterOutPathName("_metadata"))
+ assert(!HadoopFSUtils.shouldFilterOutPathName("_common_metadata"))
+ assert(HadoopFSUtils.shouldFilterOutPathName("_ab_metadata"))
+ assert(HadoopFSUtils.shouldFilterOutPathName("_cd_common_metadata"))
+ assert(HadoopFSUtils.shouldFilterOutPathName("a._COPYING_"))
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 4783789..e84f594 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{HadoopFSUtils, ThreadUtils, Utils}
/**
* The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
@@ -811,7 +811,7 @@ object DataSource extends Logging {
val allPaths = globbedPaths ++ nonGlobPaths
if (checkFilesExist) {
val (filteredOut, filteredIn) = allPaths.partition { path =>
- InMemoryFileIndex.shouldFilterOut(path.getName)
+ HadoopFSUtils.shouldFilterOutPathName(path.getName)
}
if (filteredIn.isEmpty) {
logWarning(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 2127595..6c3deee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -158,23 +158,10 @@ object InMemoryFileIndex extends Logging {
parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
}
- /** Checks if we should filter out this path name. */
- def shouldFilterOut(pathName: String): Boolean = {
- // We filter follow paths:
- // 1. everything that starts with _ and ., except _common_metadata and _metadata
- // because Parquet needs to find those metadata files from leaf files returned by this method.
- // We should refactor this logic to not mix metadata files with data files.
- // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
- // should skip this file in case of double reading.
- val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
- pathName.startsWith(".") || pathName.endsWith("._COPYING_")
- val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
- exclude && !include
- }
}
private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable {
override def accept(path: Path): Boolean = {
- (filter == null || filter.accept(path)) && !InMemoryFileIndex.shouldFilterOut(path.getName)
+ (filter == null || filter.accept(path)) && !HadoopFSUtils.shouldFilterOutPathName(path.getName)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 02be8c9..fcaf8df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -297,17 +297,6 @@ class FileIndexSuite extends SharedSparkSession {
}
}
- test("InMemoryFileIndex - file filtering") {
- assert(!InMemoryFileIndex.shouldFilterOut("abcd"))
- assert(InMemoryFileIndex.shouldFilterOut(".ab"))
- assert(InMemoryFileIndex.shouldFilterOut("_cd"))
- assert(!InMemoryFileIndex.shouldFilterOut("_metadata"))
- assert(!InMemoryFileIndex.shouldFilterOut("_common_metadata"))
- assert(InMemoryFileIndex.shouldFilterOut("_ab_metadata"))
- assert(InMemoryFileIndex.shouldFilterOut("_cd_common_metadata"))
- assert(InMemoryFileIndex.shouldFilterOut("a._COPYING_"))
- }
-
test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
class MockCatalog(
override val rootPaths: Seq[Path])
@@ -416,6 +405,21 @@ class FileIndexSuite extends SharedSparkSession {
fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray)
}
+ test("SPARK-34075: InMemoryFileIndex filters out hidden file on partition inference") {
+ withTempPath { path =>
+ spark
+ .range(2)
+ .select(col("id").as("p"), col("id"))
+ .write
+ .partitionBy("p")
+ .parquet(path.getAbsolutePath)
+ val targetPath = new File(path, "p=1")
+ val hiddenPath = new File(path, "_hidden_path")
+ targetPath.renameTo(hiddenPath)
+ assert(spark.read.parquet(path.getAbsolutePath).count() == 1L)
+ }
+ }
+
test("SPARK-20367 - properly unescape column names in inferPartitioning") {
withTempPath { path =>
val colToUnescape = "Column/#%'?"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org