You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/10/13 21:16:39 UTC

spark git commit: [SPARK-17661][SQL] Consolidate various listLeafFiles implementations

Repository: spark
Updated Branches:
  refs/heads/master 7106866c2 -> adc112429


[SPARK-17661][SQL] Consolidate various listLeafFiles implementations

## What changes were proposed in this pull request?
There are 4 listLeafFiles-related functions in Spark:

- ListingFileCatalog.listLeafFiles (which calls HadoopFsRelation.listLeafFilesInParallel if the number of paths passed in is greater than a threshold; if it is lower, then it has its own serial version implemented)
- HadoopFsRelation.listLeafFiles (called only by HadoopFsRelation.listLeafFilesInParallel)
- HadoopFsRelation.listLeafFilesInParallel (called only by ListingFileCatalog.listLeafFiles)

It is actually very confusing and error prone because there are effectively two distinct implementations for the serial version of listing leaf files. As an example, SPARK-17599 updated only one of the code path and ignored the other one.

This code can be improved by:

- Move all file listing code into ListingFileCatalog, since it is the only class that needs this.
- Keep only one function for listing files in serial.

## How was this patch tested?
This change should be covered by existing unit and integration tests. I also moved a test case for HadoopFsRelation.shouldFilterOut from HadoopFsRelationSuite to ListingFileCatalogSuite.

Author: petermaxlee <pe...@gmail.com>

Closes #15235 from petermaxlee/SPARK-17661.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/adc11242
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/adc11242
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/adc11242

Branch: refs/heads/master
Commit: adc112429d6fe671e6e8294824a0e41a2b1ec2e0
Parents: 7106866
Author: petermaxlee <pe...@gmail.com>
Authored: Thu Oct 13 14:16:39 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Oct 13 14:16:39 2016 -0700

----------------------------------------------------------------------
 .../datasources/ListingFileCatalog.scala        | 231 ++++++++++++++-----
 .../datasources/fileSourceInterfaces.scala      | 154 -------------
 .../datasources/HadoopFsRelationSuite.scala     |  11 -
 .../datasources/ListingFileCatalogSuite.scala   |  34 +++
 4 files changed, 206 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/adc11242/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 3253208..a68ae52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -21,11 +21,14 @@ import java.io.FileNotFoundException
 
 import scala.collection.mutable
 
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
 
 
 /**
@@ -82,73 +85,183 @@ class ListingFileCatalog(
    * This is publicly visible for testing.
    */
   def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
-    if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-      HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
-    } else {
-      // Right now, the number of paths is less than the value of
-      // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver.
-      // If there is any child that has more files than the threshold, we will use parallel
-      // listing.
-
-      // Dummy jobconf to get to the pathFilter defined in configuration
-      val jobConf = new JobConf(hadoopConf, this.getClass)
-      val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
-
-      val statuses: Seq[FileStatus] = paths.flatMap { path =>
-        val fs = path.getFileSystem(hadoopConf)
-        logTrace(s"Listing $path on driver")
-
-        val childStatuses = {
-          try {
-            val stats = fs.listStatus(path)
-            if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
-          } catch {
-            case _: FileNotFoundException =>
-              logWarning(s"The directory $path was not found. Was it deleted very recently?")
-              Array.empty[FileStatus]
-          }
-        }
+    val files =
+      if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+        ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
+      } else {
+        ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
+      }
+
+    mutable.LinkedHashSet(files: _*)
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = paths.toSet.hashCode()
+}
+
+
+object ListingFileCatalog extends Logging {
+
+  /** A serializable variant of HDFS's BlockLocation. */
+  private case class SerializableBlockLocation(
+      names: Array[String],
+      hosts: Array[String],
+      offset: Long,
+      length: Long)
+
+  /** A serializable variant of HDFS's FileStatus. */
+  private case class SerializableFileStatus(
+      path: String,
+      length: Long,
+      isDir: Boolean,
+      blockReplication: Short,
+      blockSize: Long,
+      modificationTime: Long,
+      accessTime: Long,
+      blockLocations: Array[SerializableBlockLocation])
+
+  /**
+   * List a collection of path recursively.
+   */
+  private def listLeafFilesInSerial(
+      paths: Seq[Path],
+      hadoopConf: Configuration): Seq[FileStatus] = {
+    // Dummy jobconf to get to the pathFilter defined in configuration
+    val jobConf = new JobConf(hadoopConf, this.getClass)
+    val filter = FileInputFormat.getInputPathFilter(jobConf)
+
+    paths.flatMap { path =>
+      val fs = path.getFileSystem(hadoopConf)
+      listLeafFiles0(fs, path, filter)
+    }
+  }
 
-        childStatuses.map {
-          case f: LocatedFileStatus => f
-
-          // NOTE:
-          //
-          // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
-          //   operations, calling `getFileBlockLocations` does no harm here since these file system
-          //   implementations don't actually issue RPC for this method.
-          //
-          // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
-          //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
-          //   paths exceeds threshold.
-          case f =>
-            if (f.isDirectory ) {
-              // If f is a directory, we do not need to call getFileBlockLocations (SPARK-14959).
-              f
-            } else {
-              HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
+  /**
+   * List a collection of path recursively in parallel (using Spark executors).
+   * Each task launched will use [[listLeafFilesInSerial]] to list.
+   */
+  private def listLeafFilesInParallel(
+      paths: Seq[Path],
+      hadoopConf: Configuration,
+      sparkSession: SparkSession): Seq[FileStatus] = {
+    assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+    logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+
+    val sparkContext = sparkSession.sparkContext
+    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+    val serializedPaths = paths.map(_.toString)
+
+    // Set the number of parallelism to prevent following file listing from generating many tasks
+    // in case of large #defaultParallelism.
+    val numParallelism = Math.min(paths.size, 10000)
+
+    val statuses = sparkContext
+      .parallelize(serializedPaths, numParallelism)
+      .mapPartitions { paths =>
+        val hadoopConf = serializableConfiguration.value
+        listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
+      }.map { status =>
+        // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
+        val blockLocations = status match {
+          case f: LocatedFileStatus =>
+            f.getBlockLocations.map { loc =>
+              SerializableBlockLocation(
+                loc.getNames,
+                loc.getHosts,
+                loc.getOffset,
+                loc.getLength)
             }
+
+          case _ =>
+            Array.empty[SerializableBlockLocation]
         }
-      }.filterNot { status =>
-        val name = status.getPath.getName
-        HadoopFsRelation.shouldFilterOut(name)
-      }
 
-      val (dirs, files) = statuses.partition(_.isDirectory)
+        SerializableFileStatus(
+          status.getPath.toString,
+          status.getLen,
+          status.isDirectory,
+          status.getReplication,
+          status.getBlockSize,
+          status.getModificationTime,
+          status.getAccessTime,
+          blockLocations)
+      }.collect()
 
-      // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
-      if (dirs.isEmpty) {
-        mutable.LinkedHashSet(files: _*)
-      } else {
-        mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
+    // Turn SerializableFileStatus back to Status
+    statuses.map { f =>
+      val blockLocations = f.blockLocations.map { loc =>
+        new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
       }
+      new LocatedFileStatus(
+        new FileStatus(
+          f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
+        blockLocations)
     }
   }
 
-  override def equals(other: Any): Boolean = other match {
-    case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
-    case _ => false
+  /**
+   * List a single path, provided as a FileStatus, in serial.
+   */
+  private def listLeafFiles0(
+      fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+    logTrace(s"Listing $path")
+    val name = path.getName.toLowerCase
+    if (shouldFilterOut(name)) {
+      Seq.empty[FileStatus]
+    } else {
+      // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
+      // Note that statuses only include FileStatus for the files and dirs directly under path,
+      // and does not include anything else recursively.
+      val statuses = try fs.listStatus(path) catch {
+        case _: FileNotFoundException =>
+          logWarning(s"The directory $path was not found. Was it deleted very recently?")
+          Array.empty[FileStatus]
+      }
+
+      val allLeafStatuses = {
+        val (dirs, files) = statuses.partition(_.isDirectory)
+        val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
+        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
+      }
+
+      allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
+        case f: LocatedFileStatus =>
+          f
+
+        // NOTE:
+        //
+        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
+        //   operations, calling `getFileBlockLocations` does no harm here since these file system
+        //   implementations don't actually issue RPC for this method.
+        //
+        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
+        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
+        //   paths exceeds threshold.
+        case f =>
+          // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
+          // which is very slow on some file system (RawLocalFileSystem, which is launch a
+          // subprocess and parse the stdout).
+          val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
+            f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
+          if (f.isSymlink) {
+            lfs.setSymlink(f.getSymlink)
+          }
+          lfs
+      }
+    }
   }
 
-  override def hashCode(): Int = paths.toSet.hashCode()
+  /** Checks if we should filter out this path name. */
+  def shouldFilterOut(pathName: String): Boolean = {
+    // We filter everything that starts with _ and ., except _common_metadata and _metadata
+    // because Parquet needs to find those metadata files from leaf files returned by this method.
+    // We should refactor this logic to not mix metadata files with data files.
+    ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
+      !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/adc11242/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 5cc5f32..69dd622 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -17,16 +17,12 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import scala.collection.mutable
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -35,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
 
 /**
  * ::Experimental::
@@ -352,152 +347,3 @@ trait FileCatalog {
   /** Refresh the file listing */
   def refresh(): Unit
 }
-
-
-/**
- * Helper methods for gathering metadata from HDFS.
- */
-object HadoopFsRelation extends Logging {
-
-  /** Checks if we should filter out this path name. */
-  def shouldFilterOut(pathName: String): Boolean = {
-    // We filter everything that starts with _ and ., except _common_metadata and _metadata
-    // because Parquet needs to find those metadata files from leaf files returned by this method.
-    // We should refactor this logic to not mix metadata files with data files.
-    ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
-      !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
-  }
-
-  /**
-   * Create a LocatedFileStatus using FileStatus and block locations.
-   */
-  def createLocatedFileStatus(f: FileStatus, locations: Array[BlockLocation]): LocatedFileStatus = {
-    // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), which is
-    // very slow on some file system (RawLocalFileSystem, which is launch a subprocess and parse the
-    // stdout).
-    val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
-      f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
-    if (f.isSymlink) {
-      lfs.setSymlink(f.getSymlink)
-    }
-    lfs
-  }
-
-  // We don't filter files/directories whose name start with "_" except "_temporary" here, as
-  // specific data sources may take advantages over them (e.g. Parquet _metadata and
-  // _common_metadata files). "_temporary" directories are explicitly ignored since failed
-  // tasks/jobs may leave partial/corrupted data files there.  Files and directories whose name
-  // start with "." are also ignored.
-  def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
-    logTrace(s"Listing ${status.getPath}")
-    val name = status.getPath.getName.toLowerCase
-    if (shouldFilterOut(name)) {
-      Array.empty[FileStatus]
-    } else {
-      val statuses = {
-        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
-        val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))
-        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
-      }
-      // statuses do not have any dirs.
-      statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
-        case f: LocatedFileStatus => f
-
-        // NOTE:
-        //
-        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
-        //   operations, calling `getFileBlockLocations` does no harm here since these file system
-        //   implementations don't actually issue RPC for this method.
-        //
-        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
-        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
-        //   paths exceeds threshold.
-        case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
-      }
-    }
-  }
-
-  // `FileStatus` is Writable but not serializable.  What make it worse, somehow it doesn't play
-  // well with `SerializableWritable`.  So there seems to be no way to serialize a `FileStatus`.
-  // Here we use `FakeFileStatus` to extract key components of a `FileStatus` to serialize it from
-  // executor side and reconstruct it on driver side.
-  case class FakeBlockLocation(
-      names: Array[String],
-      hosts: Array[String],
-      offset: Long,
-      length: Long)
-
-  case class FakeFileStatus(
-      path: String,
-      length: Long,
-      isDir: Boolean,
-      blockReplication: Short,
-      blockSize: Long,
-      modificationTime: Long,
-      accessTime: Long,
-      blockLocations: Array[FakeBlockLocation])
-
-  def listLeafFilesInParallel(
-      paths: Seq[Path],
-      hadoopConf: Configuration,
-      sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = {
-    assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
-    logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
-
-    val sparkContext = sparkSession.sparkContext
-    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
-    val serializedPaths = paths.map(_.toString)
-
-    // Set the number of parallelism to prevent following file listing from generating many tasks
-    // in case of large #defaultParallelism.
-    val numParallelism = Math.min(paths.size, 10000)
-
-    val fakeStatuses = sparkContext
-        .parallelize(serializedPaths, numParallelism)
-        .mapPartitions { paths =>
-      // Dummy jobconf to get to the pathFilter defined in configuration
-      // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
-      val jobConf = new JobConf(serializableConfiguration.value, this.getClass)
-      val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
-      paths.map(new Path(_)).flatMap { path =>
-        val fs = path.getFileSystem(serializableConfiguration.value)
-        listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
-      }
-    }.map { status =>
-      val blockLocations = status match {
-        case f: LocatedFileStatus =>
-          f.getBlockLocations.map { loc =>
-            FakeBlockLocation(
-              loc.getNames,
-              loc.getHosts,
-              loc.getOffset,
-              loc.getLength)
-          }
-
-        case _ =>
-          Array.empty[FakeBlockLocation]
-      }
-
-      FakeFileStatus(
-        status.getPath.toString,
-        status.getLen,
-        status.isDirectory,
-        status.getReplication,
-        status.getBlockSize,
-        status.getModificationTime,
-        status.getAccessTime,
-        blockLocations)
-    }.collect()
-
-    val hadoopFakeStatuses = fakeStatuses.map { f =>
-      val blockLocations = f.blockLocations.map { loc =>
-        new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
-      }
-      new LocatedFileStatus(
-        new FileStatus(
-          f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
-        blockLocations)
-    }
-    mutable.LinkedHashSet(hadoopFakeStatuses: _*)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/adc11242/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
index 3c68dc8..89d5765 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
@@ -39,15 +39,4 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
       assert(df.queryExecution.logical.statistics.sizeInBytes === BigInt(totalSize))
     }
   }
-
-  test("file filtering") {
-    assert(!HadoopFsRelation.shouldFilterOut("abcd"))
-    assert(HadoopFsRelation.shouldFilterOut(".ab"))
-    assert(HadoopFsRelation.shouldFilterOut("_cd"))
-
-    assert(!HadoopFsRelation.shouldFilterOut("_metadata"))
-    assert(!HadoopFsRelation.shouldFilterOut("_common_metadata"))
-    assert(HadoopFsRelation.shouldFilterOut("_ab_metadata"))
-    assert(HadoopFsRelation.shouldFilterOut("_cd_common_metadata"))
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/adc11242/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala
new file mode 100644
index 0000000..f15730a
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.SparkFunSuite
+
+class ListingFileCatalogSuite extends SparkFunSuite {
+
+  test("file filtering") {
+    assert(!ListingFileCatalog.shouldFilterOut("abcd"))
+    assert(ListingFileCatalog.shouldFilterOut(".ab"))
+    assert(ListingFileCatalog.shouldFilterOut("_cd"))
+
+    assert(!ListingFileCatalog.shouldFilterOut("_metadata"))
+    assert(!ListingFileCatalog.shouldFilterOut("_common_metadata"))
+    assert(ListingFileCatalog.shouldFilterOut("_ab_metadata"))
+    assert(ListingFileCatalog.shouldFilterOut("_cd_common_metadata"))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org