You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/10/18 04:26:25 UTC
spark git commit: Revert "[SPARK-17974] Refactor FileCatalog classes
to simplify the inheritance tree"
Repository: spark
Updated Branches:
refs/heads/master 8daa1a29b -> 1c5a7d7f6
Revert "[SPARK-17974] Refactor FileCatalog classes to simplify the inheritance tree"
This reverts commit 8daa1a29b65a9b5337518458e9ece1619e8a01e3.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c5a7d7f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c5a7d7f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c5a7d7f
Branch: refs/heads/master
Commit: 1c5a7d7f64993540baa5558be80130ee6911ba3c
Parents: 8daa1a2
Author: Reynold Xin <rx...@databricks.com>
Authored: Mon Oct 17 21:26:28 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Oct 17 21:26:28 2016 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../sql/execution/DataSourceScanExec.scala | 4 +-
.../sql/execution/datasources/FileCatalog.scala | 66 ------
.../sql/execution/datasources/FileFormat.scala | 61 +++++
.../datasources/HadoopFsRelation.scala | 4 +-
.../PartitioningAwareFileCatalog.scala | 217 +-----------------
.../datasources/PartitioningUtils.scala | 12 +-
.../datasources/SessionFileCatalog.scala | 225 +++++++++++++++++++
.../datasources/TableFileCatalog.scala | 11 +-
.../datasources/FileCatalogSuite.scala | 10 -
.../datasources/SessionFileCatalogSuite.scala | 34 +++
.../ParquetPartitionDiscoverySuite.scala | 9 +-
.../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +-
13 files changed, 354 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 073d2b1..7dccbbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index fdd1fa3..623d2be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -431,7 +431,7 @@ case class FileSourceScanExec(
private def createBucketedReadRDD(
bucketSpec: BucketSpec,
readFile: (PartitionedFile) => Iterator[InternalRow],
- selectedPartitions: Seq[PartitionDirectory],
+ selectedPartitions: Seq[Partition],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val bucketed =
@@ -463,7 +463,7 @@ case class FileSourceScanExec(
*/
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
- selectedPartitions: Seq[PartitionDirectory],
+ selectedPartitions: Seq[Partition],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
deleted file mode 100644
index 2bc66ce..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.hadoop.fs._
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-
-/**
- * A collection of data files from a partitioned relation, along with the partition values in the
- * form of an [[InternalRow]].
- */
-case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
-
-/**
- * An interface for objects capable of enumerating the root paths of a relation as well as the
- * partitions of a relation subject to some pruning expressions.
- */
-trait FileCatalog {
-
- /**
- * Returns the list of root input paths from which the catalog will get files. There may be a
- * single root path from which partitions are discovered, or individual partitions may be
- * specified by each path.
- */
- def rootPaths: Seq[Path]
-
- /**
- * Returns all valid files grouped into partitions when the data is partitioned. If the data is
- * unpartitioned, this will return a single partition with no partition values.
- *
- * @param filters The filters used to prune which partitions are returned. These filters must
- * only refer to partition columns and this method will only return files
- * where these predicates are guaranteed to evaluate to `true`. Thus, these
- * filters will not need to be evaluated again on the returned data.
- */
- def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
-
- /**
- * Returns the list of files that will be read when scanning this relation. This call may be
- * very expensive for large tables.
- */
- def inputFiles: Array[String]
-
- /** Refresh any cached file listings */
- def refresh(): Unit
-
- /** Sum of table file sizes, in bytes */
- def sizeInBytes: Long
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 9d153ce..e7239ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -175,3 +175,64 @@ abstract class TextBasedFileFormat extends FileFormat {
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
}
}
+
+/**
+ * A collection of data files from a partitioned relation, along with the partition values in the
+ * form of an [[InternalRow]].
+ */
+case class Partition(values: InternalRow, files: Seq[FileStatus])
+
+/**
+ * An interface for objects capable of enumerating the root paths of a relation as well as the
+ * partitions of a relation subject to some pruning expressions.
+ */
+trait BasicFileCatalog {
+
+ /**
+ * Returns the list of root input paths from which the catalog will get files. There may be a
+ * single root path from which partitions are discovered, or individual partitions may be
+ * specified by each path.
+ */
+ def rootPaths: Seq[Path]
+
+ /**
+ * Returns all valid files grouped into partitions when the data is partitioned. If the data is
+ * unpartitioned, this will return a single partition with no partition values.
+ *
+ * @param filters The filters used to prune which partitions are returned. These filters must
+ * only refer to partition columns and this method will only return files
+ * where these predicates are guaranteed to evaluate to `true`. Thus, these
+ * filters will not need to be evaluated again on the returned data.
+ */
+ def listFiles(filters: Seq[Expression]): Seq[Partition]
+
+ /** Returns the list of files that will be read when scanning this relation. */
+ def inputFiles: Array[String]
+
+ /** Refresh any cached file listings */
+ def refresh(): Unit
+
+ /** Sum of table file sizes, in bytes */
+ def sizeInBytes: Long
+}
+
+/**
+ * A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from
+ * those, infer the relation's partition specification.
+ */
+// TODO: Consider a more descriptive, appropriate name which suggests this is a file catalog for
+// which it is safe to list all of its files?
+trait FileCatalog extends BasicFileCatalog {
+
+ /** Returns the specification of the partitions inferred from the data. */
+ def partitionSpec(): PartitionSpec
+
+ /** Returns all the valid files. */
+ def allFiles(): Seq[FileStatus]
+
+ /** Returns the list of files that will be read when scanning this relation. */
+ override def inputFiles: Array[String] =
+ allFiles().map(_.getPath.toUri.toString).toArray
+
+ override def sizeInBytes: Long = allFiles().map(_.getLen).sum
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index afad889..db889ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
* Acts as a container for all of the metadata required to read from a datasource. All discovery,
* resolution and merging logic for schemas and partitions has been removed.
*
- * @param location A [[FileCatalog]] that can enumerate the locations of all the files that
+ * @param location A [[BasicFileCatalog]] that can enumerate the locations of all the files that
* comprise this relation.
* @param partitionSchema The schema of the columns (if any) that are used to partition the relation
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
* @param options Configuration used when reading / writing data.
*/
case class HadoopFsRelation(
- location: FileCatalog,
+ location: BasicFileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 5c8eff7..b250811 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -17,21 +17,14 @@
package org.apache.spark.sql.execution.datasources
-import java.io.FileNotFoundException
-
import scala.collection.mutable
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.spark.internal.Logging
-import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{StringType, StructType}
-import org.apache.spark.util.SerializableConfiguration
/**
@@ -45,24 +38,22 @@ import org.apache.spark.util.SerializableConfiguration
abstract class PartitioningAwareFileCatalog(
sparkSession: SparkSession,
parameters: Map[String, String],
- partitionSchema: Option[StructType]) extends FileCatalog with Logging {
+ partitionSchema: Option[StructType])
+ extends SessionFileCatalog(sparkSession) with FileCatalog {
import PartitioningAwareFileCatalog.BASE_PATH_PARAM
- /** Returns the specification of the partitions inferred from the data. */
- def partitionSpec(): PartitionSpec
-
- protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
+ override protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
- override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
+ override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
- PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
+ Partition(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
} else {
prunePartitions(filters, partitionSpec()).map {
- case PartitionPath(values, path) =>
+ case PartitionDirectory(values, path) =>
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
@@ -72,20 +63,14 @@ abstract class PartitioningAwareFileCatalog(
// Directory does not exist, or has no children files
Nil
}
- PartitionDirectory(values, files)
+ Partition(values, files)
}
}
logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
selectedPartitions
}
- /** Returns the list of files that will be read when scanning this relation. */
- override def inputFiles: Array[String] =
- allFiles().map(_.getPath.toUri.toString).toArray
-
- override def sizeInBytes: Long = allFiles().map(_.getLen).sum
-
- def allFiles(): Seq[FileStatus] = {
+ override def allFiles(): Seq[FileStatus] = {
if (partitionSpec().partitionColumns.isEmpty) {
// For each of the root input paths, get the list of files inside them
rootPaths.flatMap { path =>
@@ -154,7 +139,7 @@ abstract class PartitioningAwareFileCatalog(
private def prunePartitions(
predicates: Seq[Expression],
- partitionSpec: PartitionSpec): Seq[PartitionPath] = {
+ partitionSpec: PartitionSpec): Seq[PartitionDirectory] = {
val PartitionSpec(partitionColumns, partitions) = partitionSpec
val partitionColumnNames = partitionColumns.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
@@ -171,7 +156,7 @@ abstract class PartitioningAwareFileCatalog(
})
val selected = partitions.filter {
- case PartitionPath(values, _) => boundPredicate(values)
+ case PartitionDirectory(values, _) => boundPredicate(values)
}
logInfo {
val total = partitions.length
@@ -229,186 +214,8 @@ abstract class PartitioningAwareFileCatalog(
val name = path.getName
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
}
-
- /**
- * List leaf files of given paths. This method will submit a Spark job to do parallel
- * listing whenever there is a path having more files than the parallel partition discovery
- * discovery threshold.
- *
- * This is publicly visible for testing.
- */
- def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
- val files =
- if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
- PartitioningAwareFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
- } else {
- PartitioningAwareFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
- }
-
- HiveCatalogMetrics.incrementFilesDiscovered(files.size)
- mutable.LinkedHashSet(files: _*)
- }
}
-object PartitioningAwareFileCatalog extends Logging {
+object PartitioningAwareFileCatalog {
val BASE_PATH_PARAM = "basePath"
-
- /** A serializable variant of HDFS's BlockLocation. */
- private case class SerializableBlockLocation(
- names: Array[String],
- hosts: Array[String],
- offset: Long,
- length: Long)
-
- /** A serializable variant of HDFS's FileStatus. */
- private case class SerializableFileStatus(
- path: String,
- length: Long,
- isDir: Boolean,
- blockReplication: Short,
- blockSize: Long,
- modificationTime: Long,
- accessTime: Long,
- blockLocations: Array[SerializableBlockLocation])
-
- /**
- * List a collection of path recursively.
- */
- private def listLeafFilesInSerial(
- paths: Seq[Path],
- hadoopConf: Configuration): Seq[FileStatus] = {
- // Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(hadoopConf, this.getClass)
- val filter = FileInputFormat.getInputPathFilter(jobConf)
-
- paths.flatMap { path =>
- val fs = path.getFileSystem(hadoopConf)
- listLeafFiles0(fs, path, filter)
- }
- }
-
- /**
- * List a collection of path recursively in parallel (using Spark executors).
- * Each task launched will use [[listLeafFilesInSerial]] to list.
- */
- private def listLeafFilesInParallel(
- paths: Seq[Path],
- hadoopConf: Configuration,
- sparkSession: SparkSession): Seq[FileStatus] = {
- assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
- logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
-
- val sparkContext = sparkSession.sparkContext
- val serializableConfiguration = new SerializableConfiguration(hadoopConf)
- val serializedPaths = paths.map(_.toString)
-
- // Set the number of parallelism to prevent following file listing from generating many tasks
- // in case of large #defaultParallelism.
- val numParallelism = Math.min(paths.size, 10000)
-
- val statuses = sparkContext
- .parallelize(serializedPaths, numParallelism)
- .mapPartitions { paths =>
- val hadoopConf = serializableConfiguration.value
- listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
- }.map { status =>
- // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
- val blockLocations = status match {
- case f: LocatedFileStatus =>
- f.getBlockLocations.map { loc =>
- SerializableBlockLocation(
- loc.getNames,
- loc.getHosts,
- loc.getOffset,
- loc.getLength)
- }
-
- case _ =>
- Array.empty[SerializableBlockLocation]
- }
-
- SerializableFileStatus(
- status.getPath.toString,
- status.getLen,
- status.isDirectory,
- status.getReplication,
- status.getBlockSize,
- status.getModificationTime,
- status.getAccessTime,
- blockLocations)
- }.collect()
-
- // Turn SerializableFileStatus back to Status
- statuses.map { f =>
- val blockLocations = f.blockLocations.map { loc =>
- new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
- }
- new LocatedFileStatus(
- new FileStatus(
- f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
- blockLocations)
- }
- }
-
- /**
- * List a single path, provided as a FileStatus, in serial.
- */
- private def listLeafFiles0(
- fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
- logTrace(s"Listing $path")
- val name = path.getName.toLowerCase
- if (shouldFilterOut(name)) {
- Seq.empty[FileStatus]
- } else {
- // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
- // Note that statuses only include FileStatus for the files and dirs directly under path,
- // and does not include anything else recursively.
- val statuses = try fs.listStatus(path) catch {
- case _: FileNotFoundException =>
- logWarning(s"The directory $path was not found. Was it deleted very recently?")
- Array.empty[FileStatus]
- }
-
- val allLeafStatuses = {
- val (dirs, files) = statuses.partition(_.isDirectory)
- val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
- if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
- }
-
- allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
- case f: LocatedFileStatus =>
- f
-
- // NOTE:
- //
- // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
- // operations, calling `getFileBlockLocations` does no harm here since these file system
- // implementations don't actually issue RPC for this method.
- //
- // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
- // be a big deal since we always use to `listLeafFilesInParallel` when the number of
- // paths exceeds threshold.
- case f =>
- // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
- // which is very slow on some file system (RawLocalFileSystem, which is launch a
- // subprocess and parse the stdout).
- val locations = fs.getFileBlockLocations(f, 0, f.getLen)
- val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
- f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
- if (f.isSymlink) {
- lfs.setSymlink(f.getSymlink)
- }
- lfs
- }
- }
- }
-
- /** Checks if we should filter out this path name. */
- def shouldFilterOut(pathName: String): Boolean = {
- // We filter everything that starts with _ and ., except _common_metadata and _metadata
- // because Parquet needs to find those metadata files from leaf files returned by this method.
- // We should refactor this logic to not mix metadata files with data files.
- ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
- !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index ac6795b..5044642 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -33,8 +33,8 @@ import org.apache.spark.sql.types._
// TODO: We should tighten up visibility of the classes here once we clean up Hive coupling.
-object PartitionPath {
- def apply(values: InternalRow, path: String): PartitionPath =
+object PartitionDirectory {
+ def apply(values: InternalRow, path: String): PartitionDirectory =
apply(values, new Path(path))
}
@@ -42,14 +42,14 @@ object PartitionPath {
* Holds a directory in a partitioned collection of files as well as as the partition values
* in the form of a Row. Before scanning, the files at `path` need to be enumerated.
*/
-case class PartitionPath(values: InternalRow, path: Path)
+case class PartitionDirectory(values: InternalRow, path: Path)
case class PartitionSpec(
partitionColumns: StructType,
- partitions: Seq[PartitionPath])
+ partitions: Seq[PartitionDirectory])
object PartitionSpec {
- val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionPath])
+ val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionDirectory])
}
object PartitioningUtils {
@@ -141,7 +141,7 @@ object PartitioningUtils {
// Finally, we create `Partition`s based on paths and resolved partition values.
val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map {
case (PartitionValues(_, literals), (path, _)) =>
- PartitionPath(InternalRow.fromSeq(literals.map(_.value)), path)
+ PartitionDirectory(InternalRow.fromSeq(literals.map(_.value)), path)
}
PartitionSpec(StructType(fields), partitions)
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
new file mode 100644
index 0000000..4807a92
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.FileNotFoundException
+
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.SerializableConfiguration
+
+
+/**
+ * A base class for [[BasicFileCatalog]]s that need a [[SparkSession]] and the ability to find leaf
+ * files in a list of HDFS paths.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param ignoreFileNotFound (see [[ListingFileCatalog]])
+ */
+abstract class SessionFileCatalog(sparkSession: SparkSession)
+ extends BasicFileCatalog with Logging {
+ protected val hadoopConf: Configuration
+
+ /**
+ * List leaf files of given paths. This method will submit a Spark job to do parallel
+ * listing whenever there is a path having more files than the parallel partition discovery
+ * discovery threshold.
+ *
+ * This is publicly visible for testing.
+ */
+ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+ val files =
+ if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+ SessionFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
+ } else {
+ SessionFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
+ }
+
+ HiveCatalogMetrics.incrementFilesDiscovered(files.size)
+ mutable.LinkedHashSet(files: _*)
+ }
+}
+
+object SessionFileCatalog extends Logging {
+
+ /** A serializable variant of HDFS's BlockLocation. */
+ private case class SerializableBlockLocation(
+ names: Array[String],
+ hosts: Array[String],
+ offset: Long,
+ length: Long)
+
+ /** A serializable variant of HDFS's FileStatus. */
+ private case class SerializableFileStatus(
+ path: String,
+ length: Long,
+ isDir: Boolean,
+ blockReplication: Short,
+ blockSize: Long,
+ modificationTime: Long,
+ accessTime: Long,
+ blockLocations: Array[SerializableBlockLocation])
+
+ /**
+ * List a collection of path recursively.
+ */
+ private def listLeafFilesInSerial(
+ paths: Seq[Path],
+ hadoopConf: Configuration): Seq[FileStatus] = {
+ // Dummy jobconf to get to the pathFilter defined in configuration
+ val jobConf = new JobConf(hadoopConf, this.getClass)
+ val filter = FileInputFormat.getInputPathFilter(jobConf)
+
+ paths.flatMap { path =>
+ val fs = path.getFileSystem(hadoopConf)
+ listLeafFiles0(fs, path, filter)
+ }
+ }
+
+ /**
+ * List a collection of path recursively in parallel (using Spark executors).
+ * Each task launched will use [[listLeafFilesInSerial]] to list.
+ */
+ private def listLeafFilesInParallel(
+ paths: Seq[Path],
+ hadoopConf: Configuration,
+ sparkSession: SparkSession): Seq[FileStatus] = {
+ assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+ logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+
+ val sparkContext = sparkSession.sparkContext
+ val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+ val serializedPaths = paths.map(_.toString)
+
+ // Set the number of parallelism to prevent following file listing from generating many tasks
+ // in case of large #defaultParallelism.
+ val numParallelism = Math.min(paths.size, 10000)
+
+ val statuses = sparkContext
+ .parallelize(serializedPaths, numParallelism)
+ .mapPartitions { paths =>
+ val hadoopConf = serializableConfiguration.value
+ listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
+ }.map { status =>
+ // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
+ val blockLocations = status match {
+ case f: LocatedFileStatus =>
+ f.getBlockLocations.map { loc =>
+ SerializableBlockLocation(
+ loc.getNames,
+ loc.getHosts,
+ loc.getOffset,
+ loc.getLength)
+ }
+
+ case _ =>
+ Array.empty[SerializableBlockLocation]
+ }
+
+ SerializableFileStatus(
+ status.getPath.toString,
+ status.getLen,
+ status.isDirectory,
+ status.getReplication,
+ status.getBlockSize,
+ status.getModificationTime,
+ status.getAccessTime,
+ blockLocations)
+ }.collect()
+
+ // Turn SerializableFileStatus back to Status
+ statuses.map { f =>
+ val blockLocations = f.blockLocations.map { loc =>
+ new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+ }
+ new LocatedFileStatus(
+ new FileStatus(
+ f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
+ blockLocations)
+ }
+ }
+
+ /**
+ * List a single path, provided as a FileStatus, in serial.
+ */
+ private def listLeafFiles0(
+ fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+ logTrace(s"Listing $path")
+ val name = path.getName.toLowerCase
+ if (shouldFilterOut(name)) {
+ Seq.empty[FileStatus]
+ } else {
+ // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
+ // Note that statuses only include FileStatus for the files and dirs directly under path,
+ // and does not include anything else recursively.
+ val statuses = try fs.listStatus(path) catch {
+ case _: FileNotFoundException =>
+ logWarning(s"The directory $path was not found. Was it deleted very recently?")
+ Array.empty[FileStatus]
+ }
+
+ val allLeafStatuses = {
+ val (dirs, files) = statuses.partition(_.isDirectory)
+ val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
+ if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
+ }
+
+ allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
+ case f: LocatedFileStatus =>
+ f
+
+ // NOTE:
+ //
+ // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
+ // operations, calling `getFileBlockLocations` does no harm here since these file system
+ // implementations don't actually issue RPC for this method.
+ //
+ // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
+ // be a big deal since we always use to `listLeafFilesInParallel` when the number of
+ // paths exceeds threshold.
+ case f =>
+ // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
+ // which is very slow on some file system (RawLocalFileSystem, which is launch a
+ // subprocess and parse the stdout).
+ val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+ val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
+ f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
+ if (f.isSymlink) {
+ lfs.setSymlink(f.getSymlink)
+ }
+ lfs
+ }
+ }
+ }
+
+ /** Checks if we should filter out this path name. */
+ def shouldFilterOut(pathName: String): Boolean = {
+ // We filter everything that starts with _ and ., except _common_metadata and _metadata
+ // because Parquet needs to find those metadata files from leaf files returned by this method.
+ // We should refactor this logic to not mix metadata files with data files.
+ ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
+ !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index 5648ab4..a5c41b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.types.StructType
/**
- * A [[FileCatalog]] for a metastore catalog table.
+ * A [[BasicFileCatalog]] for a metastore catalog table.
*
* @param sparkSession a [[SparkSession]]
* @param db the table's database name
@@ -38,9 +38,10 @@ class TableFileCatalog(
db: String,
table: String,
partitionSchema: Option[StructType],
- override val sizeInBytes: Long) extends FileCatalog {
+ override val sizeInBytes: Long)
+ extends SessionFileCatalog(sparkSession) {
- protected val hadoopConf = sparkSession.sessionState.newHadoopConf
+ override protected val hadoopConf = sparkSession.sessionState.newHadoopConf
private val externalCatalog = sparkSession.sharedState.externalCatalog
@@ -50,7 +51,7 @@ class TableFileCatalog(
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
- override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
+ override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
filterPartitions(filters).listFiles(Nil)
}
@@ -78,7 +79,7 @@ class TableFileCatalog(
case Some(schema) =>
val selectedPartitions = externalCatalog.listPartitionsByFilter(db, table, filters)
val partitions = selectedPartitions.map { p =>
- PartitionPath(p.toRow(schema), p.storage.locationUri.get)
+ PartitionDirectory(p.toRow(schema), p.storage.locationUri.get)
}
val partitionSpec = PartitionSpec(schema, partitions)
new PrunedTableFileCatalog(
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index 9c43169..2695974 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -81,16 +81,6 @@ class FileCatalogSuite extends SharedSQLContext {
}
}
- test("PartitioningAwareFileCatalog - file filtering") {
- assert(!PartitioningAwareFileCatalog.shouldFilterOut("abcd"))
- assert(PartitioningAwareFileCatalog.shouldFilterOut(".ab"))
- assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd"))
- assert(!PartitioningAwareFileCatalog.shouldFilterOut("_metadata"))
- assert(!PartitioningAwareFileCatalog.shouldFilterOut("_common_metadata"))
- assert(PartitioningAwareFileCatalog.shouldFilterOut("_ab_metadata"))
- assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd_common_metadata"))
- }
-
test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") {
class MockCatalog(
override val rootPaths: Seq[Path])
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
new file mode 100644
index 0000000..df50958
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.SparkFunSuite
+
+class SessionFileCatalogSuite extends SparkFunSuite {
+
+ test("file filtering") {
+ assert(!SessionFileCatalog.shouldFilterOut("abcd"))
+ assert(SessionFileCatalog.shouldFilterOut(".ab"))
+ assert(SessionFileCatalog.shouldFilterOut("_cd"))
+
+ assert(!SessionFileCatalog.shouldFilterOut("_metadata"))
+ assert(!SessionFileCatalog.shouldFilterOut("_common_metadata"))
+ assert(SessionFileCatalog.shouldFilterOut("_ab_metadata"))
+ assert(SessionFileCatalog.shouldFilterOut("_cd_common_metadata"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 36d4df0..43357c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -30,7 +30,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionPath => Partition, PartitioningAwareFileCatalog, PartitioningUtils, PartitionSpec}
+import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -626,11 +626,10 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
(1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
- case LogicalRelation(
- HadoopFsRelation(location: PartitioningAwareFileCatalog, _, _, _, _, _), _, _) =>
- assert(location.partitionSpec() === PartitionSpec.emptySpec)
+ case LogicalRelation(HadoopFsRelation(location: FileCatalog, _, _, _, _, _), _, _) =>
+ assert(location.partitionSpec === PartitionSpec.emptySpec)
}.getOrElse {
- fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution")
+ fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1c5a7d7f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 16e1e37..4a2aaa7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.types._
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org