You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/10/30 20:14:51 UTC
[1/2] spark git commit: [SPARK-18103][SQL] Rename *FileCatalog to
*FileIndex
Repository: spark
Updated Branches:
refs/heads/master 3ad99f166 -> 90d3b91f4
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d1de863..624ab74 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -200,7 +200,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
Seq(metastoreRelation.hiveQlTable.getDataLocation)
} else {
- // By convention (for example, see TableFileCatalog), the definition of a
+ // By convention (for example, see CatalogFileIndex), the definition of a
// partitioned table's paths depends on whether that table has any actual partitions.
// Partitioned tables without partitions use the location of the table's base path.
// Partitioned tables with partitions use the locations of those partitions' data
@@ -227,7 +227,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val logicalRelation = cached.getOrElse {
val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
val fileCatalog = {
- val catalog = new TableFileCatalog(
+ val catalog = new CatalogFileIndex(
sparkSession, metastoreRelation.catalogTable, sizeInBytes)
if (lazyPruningEnabled) {
catalog
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index ecdf4f1..fc35304 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -321,17 +321,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
sql("DROP TABLE cachedTable")
}
- test("cache a table using TableFileCatalog") {
+ test("cache a table using CatalogFileIndex") {
withTable("test") {
sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet")
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
- val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
+ val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
})
val relation = HadoopFsRelation(
- location = tableFileCatalog,
+ location = catalogFileIndex,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
@@ -343,7 +343,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
- val sameCatalog = new TableFileCatalog(spark, tableMeta, 0)
+ val sameCatalog = new CatalogFileIndex(spark, tableMeta, 0)
val sameRelation = HadoopFsRelation(
location = sameCatalog,
partitionSchema = tableMeta.partitionSchema,
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 476383a..d8e31c4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -256,7 +256,7 @@ class PartitionedTablePerfStatsSuite
// of doing plan cache validation based on the entire partition set.
HiveCatalogMetrics.reset()
assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
- // 5 from table resolution, another 5 from ListingFileCatalog
+ // 5 from table resolution, another 5 from InMemoryFileIndex
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index 59639aa..cdbc26c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -45,13 +45,13 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
|LOCATION '${dir.getAbsolutePath}'""".stripMargin)
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
- val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
+ val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
})
val relation = HadoopFsRelation(
- location = tableFileCatalog,
+ location = catalogFileIndex,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[2/2] spark git commit: [SPARK-18103][SQL] Rename *FileCatalog to
*FileIndex
Posted by rx...@apache.org.
[SPARK-18103][SQL] Rename *FileCatalog to *FileIndex
## What changes were proposed in this pull request?
To reduce the number of components in SQL named *Catalog, rename *FileCatalog to *FileIndex. A FileIndex is responsible for returning the list of partitions / files to scan given a filtering expression.
```
TableFileCatalog => CatalogFileIndex
FileCatalog => FileIndex
ListingFileCatalog => InMemoryFileIndex
MetadataLogFileCatalog => MetadataLogFileIndex
PrunedTableFileCatalog => PrunedInMemoryFileIndex
```
cc yhuai marmbrus
## How was this patch tested?
N/A
Author: Eric Liang <ek...@databricks.com>
Author: Eric Liang <ek...@gmail.com>
Closes #15634 from ericl/rename-file-provider.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d3b91f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d3b91f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d3b91f
Branch: refs/heads/master
Commit: 90d3b91f4cb59d84fea7105d54ef8c87a7d5c6a2
Parents: 3ad99f1
Author: Eric Liang <ek...@databricks.com>
Authored: Sun Oct 30 13:14:45 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Oct 30 13:14:45 2016 -0700
----------------------------------------------------------------------
.../spark/metrics/source/StaticSources.scala | 2 +-
.../spark/sql/execution/CacheManager.scala | 2 +-
.../datasources/CatalogFileIndex.scala | 110 +++++
.../sql/execution/datasources/DataSource.scala | 10 +-
.../sql/execution/datasources/FileCatalog.scala | 70 ---
.../sql/execution/datasources/FileIndex.scala | 70 +++
.../datasources/HadoopFsRelation.scala | 4 +-
.../datasources/InMemoryFileIndex.scala | 87 ++++
.../datasources/ListingFileCatalog.scala | 87 ----
.../PartitioningAwareFileCatalog.scala | 437 -------------------
.../PartitioningAwareFileIndex.scala | 437 +++++++++++++++++++
.../datasources/PruneFileSourcePartitions.scala | 6 +-
.../datasources/TableFileCatalog.scala | 110 -----
.../streaming/CompactibleFileStreamLog.scala | 4 +-
.../execution/streaming/FileStreamSource.scala | 4 +-
.../streaming/MetadataLogFileCatalog.scala | 6 +-
.../datasources/FileCatalogSuite.scala | 36 +-
.../datasources/FileSourceStrategySuite.scala | 2 +-
.../ParquetPartitionDiscoverySuite.scala | 2 +-
.../sql/streaming/FileStreamSinkSuite.scala | 6 +-
.../sql/streaming/FileStreamSourceSuite.scala | 2 +-
.../spark/sql/hive/HiveMetastoreCatalog.scala | 4 +-
.../spark/sql/hive/CachedTableSuite.scala | 10 +-
.../hive/PartitionedTablePerfStatsSuite.scala | 2 +-
.../PruneFileSourcePartitionsSuite.scala | 6 +-
25 files changed, 758 insertions(+), 758 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index b54885b..3f7cfd9 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -76,7 +76,7 @@ object HiveCatalogMetrics extends Source {
val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched"))
/**
- * Tracks the total number of files discovered off of the filesystem by ListingFileCatalog.
+ * Tracks the total number of files discovered off of the filesystem by InMemoryFileIndex.
*/
val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index fb72c67..526623a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -177,7 +177,7 @@ class CacheManager extends Logging {
/**
* Traverses a given `plan` and searches for the occurrences of `qualifiedPath` in the
- * [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any [[HadoopFsRelation]] nodes
+ * [[org.apache.spark.sql.execution.datasources.FileIndex]] of any [[HadoopFsRelation]] nodes
* in the plan. If found, we refresh the metadata and return true. Otherwise, this method returns
* false.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
new file mode 100644
index 0000000..092aabc
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * A [[FileIndex]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param table the metadata of the table
+ * @param sizeInBytes the table's data size in bytes
+ */
+class CatalogFileIndex(
+ sparkSession: SparkSession,
+ val table: CatalogTable,
+ override val sizeInBytes: Long) extends FileIndex {
+
+ protected val hadoopConf = sparkSession.sessionState.newHadoopConf
+
+ private val fileStatusCache = FileStatusCache.newCache(sparkSession)
+
+ assert(table.identifier.database.isDefined,
+ "The table identifier must be qualified in CatalogFileIndex")
+
+ private val baseLocation = table.storage.locationUri
+
+ override def partitionSchema: StructType = table.partitionSchema
+
+ override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
+
+ override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
+ filterPartitions(filters).listFiles(Nil)
+ }
+
+ override def refresh(): Unit = fileStatusCache.invalidateAll()
+
+ /**
+ * Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions
+ * specified by the given partition-pruning filters.
+ *
+ * @param filters partition-pruning filters
+ */
+ def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
+ if (table.partitionColumnNames.nonEmpty) {
+ val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
+ table.identifier, filters)
+ val partitions = selectedPartitions.map { p =>
+ PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
+ }
+ val partitionSpec = PartitionSpec(partitionSchema, partitions)
+ new PrunedInMemoryFileIndex(
+ sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
+ } else {
+ new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties, None)
+ }
+ }
+
+ override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
+
+ // `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
+ // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
+ // implement `equals` and `hashCode` here, to make it work with cache lookup.
+ override def equals(o: Any): Boolean = o match {
+ case other: CatalogFileIndex => this.table.identifier == other.table.identifier
+ case _ => false
+ }
+
+ override def hashCode(): Int = table.identifier.hashCode()
+}
+
+/**
+ * An override of the standard HDFS listing based catalog, that overrides the partition spec with
+ * the information from the metastore.
+ *
+ * @param tableBasePath The default base path of the Hive metastore table
+ * @param partitionSpec The partition specifications from Hive metastore
+ */
+private class PrunedInMemoryFileIndex(
+ sparkSession: SparkSession,
+ tableBasePath: Path,
+ fileStatusCache: FileStatusCache,
+ override val partitionSpec: PartitionSpec)
+ extends InMemoryFileIndex(
+ sparkSession,
+ partitionSpec.partitions.map(_.path),
+ Map.empty,
+ Some(partitionSpec.partitionColumns),
+ fileStatusCache)
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5b8f05a..9961098 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -202,7 +202,7 @@ case class DataSource(
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
- val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
+ val fileCatalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
val partitionSchema = fileCatalog.partitionSpec().partitionColumns
val inferred = format.inferSchema(
sparkSession,
@@ -364,7 +364,7 @@ case class DataSource(
case (format: FileFormat, _)
if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
- val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath)
+ val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sparkSession,
@@ -417,12 +417,12 @@ case class DataSource(
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) {
- new TableFileCatalog(
+ new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
} else {
- new ListingFileCatalog(
+ new InMemoryFileIndex(
sparkSession, globbedPaths, options, partitionSchema)
}
@@ -433,7 +433,7 @@ case class DataSource(
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
- fileCatalog.asInstanceOf[ListingFileCatalog].allFiles())
+ fileCatalog.asInstanceOf[InMemoryFileIndex].allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
deleted file mode 100644
index dba6462..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.hadoop.fs._
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.StructType
-
-/**
- * A collection of data files from a partitioned relation, along with the partition values in the
- * form of an [[InternalRow]].
- */
-case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
-
-/**
- * An interface for objects capable of enumerating the root paths of a relation as well as the
- * partitions of a relation subject to some pruning expressions.
- */
-trait FileCatalog {
-
- /**
- * Returns the list of root input paths from which the catalog will get files. There may be a
- * single root path from which partitions are discovered, or individual partitions may be
- * specified by each path.
- */
- def rootPaths: Seq[Path]
-
- /**
- * Returns all valid files grouped into partitions when the data is partitioned. If the data is
- * unpartitioned, this will return a single partition with no partition values.
- *
- * @param filters The filters used to prune which partitions are returned. These filters must
- * only refer to partition columns and this method will only return files
- * where these predicates are guaranteed to evaluate to `true`. Thus, these
- * filters will not need to be evaluated again on the returned data.
- */
- def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
-
- /**
- * Returns the list of files that will be read when scanning this relation. This call may be
- * very expensive for large tables.
- */
- def inputFiles: Array[String]
-
- /** Refresh any cached file listings */
- def refresh(): Unit
-
- /** Sum of table file sizes, in bytes */
- def sizeInBytes: Long
-
- /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
- def partitionSchema: StructType
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
new file mode 100644
index 0000000..277223d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A collection of data files from a partitioned relation, along with the partition values in the
+ * form of an [[InternalRow]].
+ */
+case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
+
+/**
+ * An interface for objects capable of enumerating the root paths of a relation as well as the
+ * partitions of a relation subject to some pruning expressions.
+ */
+trait FileIndex {
+
+ /**
+ * Returns the list of root input paths from which the catalog will get files. There may be a
+ * single root path from which partitions are discovered, or individual partitions may be
+ * specified by each path.
+ */
+ def rootPaths: Seq[Path]
+
+ /**
+ * Returns all valid files grouped into partitions when the data is partitioned. If the data is
+ * unpartitioned, this will return a single partition with no partition values.
+ *
+ * @param filters The filters used to prune which partitions are returned. These filters must
+ * only refer to partition columns and this method will only return files
+ * where these predicates are guaranteed to evaluate to `true`. Thus, these
+ * filters will not need to be evaluated again on the returned data.
+ */
+ def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
+
+ /**
+ * Returns the list of files that will be read when scanning this relation. This call may be
+ * very expensive for large tables.
+ */
+ def inputFiles: Array[String]
+
+ /** Refresh any cached file listings */
+ def refresh(): Unit
+
+ /** Sum of table file sizes, in bytes */
+ def sizeInBytes: Long
+
+ /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
+ def partitionSchema: StructType
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index afad889..014abd4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
* Acts as a container for all of the metadata required to read from a datasource. All discovery,
* resolution and merging logic for schemas and partitions has been removed.
*
- * @param location A [[FileCatalog]] that can enumerate the locations of all the files that
+ * @param location A [[FileIndex]] that can enumerate the locations of all the files that
* comprise this relation.
* @param partitionSchema The schema of the columns (if any) that are used to partition the relation
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
* @param options Configuration used when reading / writing data.
*/
case class HadoopFsRelation(
- location: FileCatalog,
+ location: FileIndex,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
new file mode 100644
index 0000000..7531f0a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * A [[FileIndex]] that generates the list of files to process by recursively listing all the
+ * files present in `paths`.
+ *
+ * @param rootPaths the list of root table paths to scan
+ * @param parameters as set of options to control discovery
+ * @param partitionSchema an optional partition schema that will be use to provide types for the
+ * discovered partitions
+ */
+class InMemoryFileIndex(
+ sparkSession: SparkSession,
+ override val rootPaths: Seq[Path],
+ parameters: Map[String, String],
+ partitionSchema: Option[StructType],
+ fileStatusCache: FileStatusCache = NoopCache)
+ extends PartitioningAwareFileIndex(
+ sparkSession, parameters, partitionSchema, fileStatusCache) {
+
+ @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+ @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+ @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+ refresh0()
+
+ override def partitionSpec(): PartitionSpec = {
+ if (cachedPartitionSpec == null) {
+ cachedPartitionSpec = inferPartitioning()
+ }
+ logTrace(s"Partition spec: $cachedPartitionSpec")
+ cachedPartitionSpec
+ }
+
+ override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
+ cachedLeafFiles
+ }
+
+ override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
+ cachedLeafDirToChildrenFiles
+ }
+
+ override def refresh(): Unit = {
+ refresh0()
+ fileStatusCache.invalidateAll()
+ }
+
+ private def refresh0(): Unit = {
+ val files = listLeafFiles(rootPaths)
+ cachedLeafFiles =
+ new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+ cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
+ cachedPartitionSpec = null
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+ case _ => false
+ }
+
+ override def hashCode(): Int = rootPaths.toSet.hashCode()
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
deleted file mode 100644
index d9d5883..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import scala.collection.mutable
-
-import org.apache.hadoop.fs._
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.types.StructType
-
-
-/**
- * A [[FileCatalog]] that generates the list of files to process by recursively listing all the
- * files present in `paths`.
- *
- * @param rootPaths the list of root table paths to scan
- * @param parameters as set of options to control discovery
- * @param partitionSchema an optional partition schema that will be use to provide types for the
- * discovered partitions
- */
-class ListingFileCatalog(
- sparkSession: SparkSession,
- override val rootPaths: Seq[Path],
- parameters: Map[String, String],
- partitionSchema: Option[StructType],
- fileStatusCache: FileStatusCache = NoopCache)
- extends PartitioningAwareFileCatalog(
- sparkSession, parameters, partitionSchema, fileStatusCache) {
-
- @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
- @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
- @volatile private var cachedPartitionSpec: PartitionSpec = _
-
- refresh0()
-
- override def partitionSpec(): PartitionSpec = {
- if (cachedPartitionSpec == null) {
- cachedPartitionSpec = inferPartitioning()
- }
- logTrace(s"Partition spec: $cachedPartitionSpec")
- cachedPartitionSpec
- }
-
- override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
- cachedLeafFiles
- }
-
- override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
- cachedLeafDirToChildrenFiles
- }
-
- override def refresh(): Unit = {
- refresh0()
- fileStatusCache.invalidateAll()
- }
-
- private def refresh0(): Unit = {
- val files = listLeafFiles(rootPaths)
- cachedLeafFiles =
- new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
- cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
- cachedPartitionSpec = null
- }
-
- override def equals(other: Any): Boolean = other match {
- case hdfs: ListingFileCatalog => rootPaths.toSet == hdfs.rootPaths.toSet
- case _ => false
- }
-
- override def hashCode(): Int = rootPaths.toSet.hashCode()
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
deleted file mode 100644
index cc4049e..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.io.FileNotFoundException
-
-import scala.collection.mutable
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.{expressions, InternalRow}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{StringType, StructType}
-import org.apache.spark.util.SerializableConfiguration
-
-/**
- * An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables.
- * It provides the necessary methods to parse partition data based on a set of files.
- *
- * @param parameters as set of options to control partition discovery
- * @param userPartitionSchema an optional partition schema that will be use to provide types for
- * the discovered partitions
- */
-abstract class PartitioningAwareFileCatalog(
- sparkSession: SparkSession,
- parameters: Map[String, String],
- userPartitionSchema: Option[StructType],
- fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging {
- import PartitioningAwareFileCatalog.BASE_PATH_PARAM
-
- /** Returns the specification of the partitions inferred from the data. */
- def partitionSpec(): PartitionSpec
-
- override def partitionSchema: StructType = partitionSpec().partitionColumns
-
- protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
-
- protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
-
- protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
-
- override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
- val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
- PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
- } else {
- prunePartitions(filters, partitionSpec()).map {
- case PartitionPath(values, path) =>
- val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
- case Some(existingDir) =>
- // Directory has children files in it, return them
- existingDir.filter(f => isDataPath(f.getPath))
-
- case None =>
- // Directory does not exist, or has no children files
- Nil
- }
- PartitionDirectory(values, files)
- }
- }
- logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
- selectedPartitions
- }
-
- /** Returns the list of files that will be read when scanning this relation. */
- override def inputFiles: Array[String] =
- allFiles().map(_.getPath.toUri.toString).toArray
-
- override def sizeInBytes: Long = allFiles().map(_.getLen).sum
-
- def allFiles(): Seq[FileStatus] = {
- if (partitionSpec().partitionColumns.isEmpty) {
- // For each of the root input paths, get the list of files inside them
- rootPaths.flatMap { path =>
- // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
- val fs = path.getFileSystem(hadoopConf)
- val qualifiedPathPre = fs.makeQualified(path)
- val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) {
- // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories,
- // because the `leafFile.getParent` would have returned an absolute path with the
- // separator at the end.
- new Path(qualifiedPathPre, Path.SEPARATOR)
- } else {
- qualifiedPathPre
- }
-
- // There are three cases possible with each path
- // 1. The path is a directory and has children files in it. Then it must be present in
- // leafDirToChildrenFiles as those children files will have been found as leaf files.
- // Find its children files from leafDirToChildrenFiles and include them.
- // 2. The path is a file, then it will be present in leafFiles. Include this path.
- // 3. The path is a directory, but has no children files. Do not include this path.
-
- leafDirToChildrenFiles.get(qualifiedPath)
- .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
- .getOrElse(Array.empty)
- }
- } else {
- leafFiles.values.toSeq
- }
- }
-
- protected def inferPartitioning(): PartitionSpec = {
- // We use leaf dirs containing data files to discover the schema.
- val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
- files.exists(f => isDataPath(f.getPath))
- }.keys.toSeq
- userPartitionSchema match {
- case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
- val spec = PartitioningUtils.parsePartitions(
- leafDirs,
- PartitioningUtils.DEFAULT_PARTITION_NAME,
- typeInference = false,
- basePaths = basePaths)
-
- // Without auto inference, all of value in the `row` should be null or in StringType,
- // we need to cast into the data type that user specified.
- def castPartitionValuesToUserSchema(row: InternalRow) = {
- InternalRow((0 until row.numFields).map { i =>
- Cast(
- Literal.create(row.getUTF8String(i), StringType),
- userProvidedSchema.fields(i).dataType).eval()
- }: _*)
- }
-
- PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
- part.copy(values = castPartitionValuesToUserSchema(part.values))
- })
- case _ =>
- PartitioningUtils.parsePartitions(
- leafDirs,
- PartitioningUtils.DEFAULT_PARTITION_NAME,
- typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
- basePaths = basePaths)
- }
- }
-
- private def prunePartitions(
- predicates: Seq[Expression],
- partitionSpec: PartitionSpec): Seq[PartitionPath] = {
- val PartitionSpec(partitionColumns, partitions) = partitionSpec
- val partitionColumnNames = partitionColumns.map(_.name).toSet
- val partitionPruningPredicates = predicates.filter {
- _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
- }
-
- if (partitionPruningPredicates.nonEmpty) {
- val predicate = partitionPruningPredicates.reduce(expressions.And)
-
- val boundPredicate = InterpretedPredicate.create(predicate.transform {
- case a: AttributeReference =>
- val index = partitionColumns.indexWhere(a.name == _.name)
- BoundReference(index, partitionColumns(index).dataType, nullable = true)
- })
-
- val selected = partitions.filter {
- case PartitionPath(values, _) => boundPredicate(values)
- }
- logInfo {
- val total = partitions.length
- val selectedSize = selected.length
- val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
- s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions."
- }
-
- selected
- } else {
- partitions
- }
- }
-
- /**
- * Contains a set of paths that are considered as the base dirs of the input datasets.
- * The partitioning discovery logic will make sure it will stop when it reaches any
- * base path.
- *
- * By default, the paths of the dataset provided by users will be base paths.
- * Below are three typical examples,
- * Case 1) `spark.read.parquet("/path/something=true/")`: the base path will be
- * `/path/something=true/`, and the returned DataFrame will not contain a column of `something`.
- * Case 2) `spark.read.parquet("/path/something=true/a.parquet")`: the base path will be
- * still `/path/something=true/`, and the returned DataFrame will also not contain a column of
- * `something`.
- * Case 3) `spark.read.parquet("/path/")`: the base path will be `/path/`, and the returned
- * DataFrame will have the column of `something`.
- *
- * Users also can override the basePath by setting `basePath` in the options to pass the new base
- * path to the data source.
- * For example, `spark.read.option("basePath", "/path/").parquet("/path/something=true/")`,
- * and the returned DataFrame will have the column of `something`.
- */
- private def basePaths: Set[Path] = {
- parameters.get(BASE_PATH_PARAM).map(new Path(_)) match {
- case Some(userDefinedBasePath) =>
- val fs = userDefinedBasePath.getFileSystem(hadoopConf)
- if (!fs.isDirectory(userDefinedBasePath)) {
- throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory")
- }
- Set(fs.makeQualified(userDefinedBasePath))
-
- case None =>
- rootPaths.map { path =>
- // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
- val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path)
- if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet
- }
- }
-
- // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be
- // counted as data files, so that they shouldn't participate partition discovery.
- private def isDataPath(path: Path): Boolean = {
- val name = path.getName
- !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
- }
-
- /**
- * List leaf files of given paths. This method will submit a Spark job to do parallel
- * listing whenever there is a path having more files than the parallel partition discovery
- * discovery threshold.
- *
- * This is publicly visible for testing.
- */
- def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
- val output = mutable.LinkedHashSet[FileStatus]()
- val pathsToFetch = mutable.ArrayBuffer[Path]()
- for (path <- paths) {
- fileStatusCache.getLeafFiles(path) match {
- case Some(files) =>
- HiveCatalogMetrics.incrementFileCacheHits(files.length)
- output ++= files
- case None =>
- pathsToFetch += path
- }
- }
- val discovered = if (pathsToFetch.length >=
- sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
- PartitioningAwareFileCatalog.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
- } else {
- PartitioningAwareFileCatalog.listLeafFilesInSerial(pathsToFetch, hadoopConf)
- }
- discovered.foreach { case (path, leafFiles) =>
- HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
- fileStatusCache.putLeafFiles(path, leafFiles.toArray)
- output ++= leafFiles
- }
- output
- }
-}
-
-object PartitioningAwareFileCatalog extends Logging {
- val BASE_PATH_PARAM = "basePath"
-
- /** A serializable variant of HDFS's BlockLocation. */
- private case class SerializableBlockLocation(
- names: Array[String],
- hosts: Array[String],
- offset: Long,
- length: Long)
-
- /** A serializable variant of HDFS's FileStatus. */
- private case class SerializableFileStatus(
- path: String,
- length: Long,
- isDir: Boolean,
- blockReplication: Short,
- blockSize: Long,
- modificationTime: Long,
- accessTime: Long,
- blockLocations: Array[SerializableBlockLocation])
-
- /**
- * List a collection of path recursively.
- */
- private def listLeafFilesInSerial(
- paths: Seq[Path],
- hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
- // Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(hadoopConf, this.getClass)
- val filter = FileInputFormat.getInputPathFilter(jobConf)
-
- paths.map { path =>
- val fs = path.getFileSystem(hadoopConf)
- (path, listLeafFiles0(fs, path, filter))
- }
- }
-
- /**
- * List a collection of path recursively in parallel (using Spark executors).
- * Each task launched will use [[listLeafFilesInSerial]] to list.
- */
- private def listLeafFilesInParallel(
- paths: Seq[Path],
- hadoopConf: Configuration,
- sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
- assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
- logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
-
- val sparkContext = sparkSession.sparkContext
- val serializableConfiguration = new SerializableConfiguration(hadoopConf)
- val serializedPaths = paths.map(_.toString)
-
- // Set the number of parallelism to prevent following file listing from generating many tasks
- // in case of large #defaultParallelism.
- val numParallelism = Math.min(paths.size, 10000)
-
- val statusMap = sparkContext
- .parallelize(serializedPaths, numParallelism)
- .mapPartitions { paths =>
- val hadoopConf = serializableConfiguration.value
- listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
- }.map { case (path, statuses) =>
- val serializableStatuses = statuses.map { status =>
- // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
- val blockLocations = status match {
- case f: LocatedFileStatus =>
- f.getBlockLocations.map { loc =>
- SerializableBlockLocation(
- loc.getNames,
- loc.getHosts,
- loc.getOffset,
- loc.getLength)
- }
-
- case _ =>
- Array.empty[SerializableBlockLocation]
- }
-
- SerializableFileStatus(
- status.getPath.toString,
- status.getLen,
- status.isDirectory,
- status.getReplication,
- status.getBlockSize,
- status.getModificationTime,
- status.getAccessTime,
- blockLocations)
- }
- (path.toString, serializableStatuses)
- }.collect()
-
- // turn SerializableFileStatus back to Status
- statusMap.map { case (path, serializableStatuses) =>
- val statuses = serializableStatuses.map { f =>
- val blockLocations = f.blockLocations.map { loc =>
- new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
- }
- new LocatedFileStatus(
- new FileStatus(
- f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
- new Path(f.path)),
- blockLocations)
- }
- (new Path(path), statuses)
- }
- }
-
- /**
- * List a single path, provided as a FileStatus, in serial.
- */
- private def listLeafFiles0(
- fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
- logTrace(s"Listing $path")
- val name = path.getName.toLowerCase
- if (shouldFilterOut(name)) {
- Seq.empty[FileStatus]
- } else {
- // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
- // Note that statuses only include FileStatus for the files and dirs directly under path,
- // and does not include anything else recursively.
- val statuses = try fs.listStatus(path) catch {
- case _: FileNotFoundException =>
- logWarning(s"The directory $path was not found. Was it deleted very recently?")
- Array.empty[FileStatus]
- }
-
- val allLeafStatuses = {
- val (dirs, files) = statuses.partition(_.isDirectory)
- val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
- if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
- }
-
- allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
- case f: LocatedFileStatus =>
- f
-
- // NOTE:
- //
- // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
- // operations, calling `getFileBlockLocations` does no harm here since these file system
- // implementations don't actually issue RPC for this method.
- //
- // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
- // be a big deal since we always use to `listLeafFilesInParallel` when the number of
- // paths exceeds threshold.
- case f =>
- // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
- // which is very slow on some file system (RawLocalFileSystem, which is launch a
- // subprocess and parse the stdout).
- val locations = fs.getFileBlockLocations(f, 0, f.getLen)
- val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
- f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
- if (f.isSymlink) {
- lfs.setSymlink(f.getSymlink)
- }
- lfs
- }
- }
- }
-
- /** Checks if we should filter out this path name. */
- def shouldFilterOut(pathName: String): Boolean = {
- // We filter everything that starts with _ and ., except _common_metadata and _metadata
- // because Parquet needs to find those metadata files from leaf files returned by this method.
- // We should refactor this logic to not mix metadata files with data files.
- ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
- !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
new file mode 100644
index 0000000..a8a722d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.FileNotFoundException
+
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * An abstract class that represents [[FileIndex]]s that are aware of partitioned tables.
+ * It provides the necessary methods to parse partition data based on a set of files.
+ *
+ * @param parameters as set of options to control partition discovery
+ * @param userPartitionSchema an optional partition schema that will be use to provide types for
+ * the discovered partitions
+ */
+abstract class PartitioningAwareFileIndex(
+ sparkSession: SparkSession,
+ parameters: Map[String, String],
+ userPartitionSchema: Option[StructType],
+ fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging {
+ import PartitioningAwareFileIndex.BASE_PATH_PARAM
+
+ /** Returns the specification of the partitions inferred from the data. */
+ def partitionSpec(): PartitionSpec
+
+ override def partitionSchema: StructType = partitionSpec().partitionColumns
+
+ protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
+
+ protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
+
+ protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
+
+ override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
+ val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
+ PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
+ } else {
+ prunePartitions(filters, partitionSpec()).map {
+ case PartitionPath(values, path) =>
+ val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
+ case Some(existingDir) =>
+ // Directory has children files in it, return them
+ existingDir.filter(f => isDataPath(f.getPath))
+
+ case None =>
+ // Directory does not exist, or has no children files
+ Nil
+ }
+ PartitionDirectory(values, files)
+ }
+ }
+ logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
+ selectedPartitions
+ }
+
+ /** Returns the list of files that will be read when scanning this relation. */
+ override def inputFiles: Array[String] =
+ allFiles().map(_.getPath.toUri.toString).toArray
+
+ override def sizeInBytes: Long = allFiles().map(_.getLen).sum
+
+ def allFiles(): Seq[FileStatus] = {
+ if (partitionSpec().partitionColumns.isEmpty) {
+ // For each of the root input paths, get the list of files inside them
+ rootPaths.flatMap { path =>
+ // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+ val fs = path.getFileSystem(hadoopConf)
+ val qualifiedPathPre = fs.makeQualified(path)
+ val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) {
+ // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories,
+ // because the `leafFile.getParent` would have returned an absolute path with the
+ // separator at the end.
+ new Path(qualifiedPathPre, Path.SEPARATOR)
+ } else {
+ qualifiedPathPre
+ }
+
+ // There are three cases possible with each path
+ // 1. The path is a directory and has children files in it. Then it must be present in
+ // leafDirToChildrenFiles as those children files will have been found as leaf files.
+ // Find its children files from leafDirToChildrenFiles and include them.
+ // 2. The path is a file, then it will be present in leafFiles. Include this path.
+ // 3. The path is a directory, but has no children files. Do not include this path.
+
+ leafDirToChildrenFiles.get(qualifiedPath)
+ .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
+ .getOrElse(Array.empty)
+ }
+ } else {
+ leafFiles.values.toSeq
+ }
+ }
+
+ protected def inferPartitioning(): PartitionSpec = {
+ // We use leaf dirs containing data files to discover the schema.
+ val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
+ files.exists(f => isDataPath(f.getPath))
+ }.keys.toSeq
+ userPartitionSchema match {
+ case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
+ val spec = PartitioningUtils.parsePartitions(
+ leafDirs,
+ PartitioningUtils.DEFAULT_PARTITION_NAME,
+ typeInference = false,
+ basePaths = basePaths)
+
+ // Without auto inference, all of value in the `row` should be null or in StringType,
+ // we need to cast into the data type that user specified.
+ def castPartitionValuesToUserSchema(row: InternalRow) = {
+ InternalRow((0 until row.numFields).map { i =>
+ Cast(
+ Literal.create(row.getUTF8String(i), StringType),
+ userProvidedSchema.fields(i).dataType).eval()
+ }: _*)
+ }
+
+ PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
+ part.copy(values = castPartitionValuesToUserSchema(part.values))
+ })
+ case _ =>
+ PartitioningUtils.parsePartitions(
+ leafDirs,
+ PartitioningUtils.DEFAULT_PARTITION_NAME,
+ typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
+ basePaths = basePaths)
+ }
+ }
+
+ private def prunePartitions(
+ predicates: Seq[Expression],
+ partitionSpec: PartitionSpec): Seq[PartitionPath] = {
+ val PartitionSpec(partitionColumns, partitions) = partitionSpec
+ val partitionColumnNames = partitionColumns.map(_.name).toSet
+ val partitionPruningPredicates = predicates.filter {
+ _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ }
+
+ if (partitionPruningPredicates.nonEmpty) {
+ val predicate = partitionPruningPredicates.reduce(expressions.And)
+
+ val boundPredicate = InterpretedPredicate.create(predicate.transform {
+ case a: AttributeReference =>
+ val index = partitionColumns.indexWhere(a.name == _.name)
+ BoundReference(index, partitionColumns(index).dataType, nullable = true)
+ })
+
+ val selected = partitions.filter {
+ case PartitionPath(values, _) => boundPredicate(values)
+ }
+ logInfo {
+ val total = partitions.length
+ val selectedSize = selected.length
+ val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
+ s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions."
+ }
+
+ selected
+ } else {
+ partitions
+ }
+ }
+
+ /**
+ * Contains a set of paths that are considered as the base dirs of the input datasets.
+ * The partitioning discovery logic will make sure it will stop when it reaches any
+ * base path.
+ *
+ * By default, the paths of the dataset provided by users will be base paths.
+ * Below are three typical examples,
+ * Case 1) `spark.read.parquet("/path/something=true/")`: the base path will be
+ * `/path/something=true/`, and the returned DataFrame will not contain a column of `something`.
+ * Case 2) `spark.read.parquet("/path/something=true/a.parquet")`: the base path will be
+ * still `/path/something=true/`, and the returned DataFrame will also not contain a column of
+ * `something`.
+ * Case 3) `spark.read.parquet("/path/")`: the base path will be `/path/`, and the returned
+ * DataFrame will have the column of `something`.
+ *
+ * Users also can override the basePath by setting `basePath` in the options to pass the new base
+ * path to the data source.
+ * For example, `spark.read.option("basePath", "/path/").parquet("/path/something=true/")`,
+ * and the returned DataFrame will have the column of `something`.
+ */
+ private def basePaths: Set[Path] = {
+ parameters.get(BASE_PATH_PARAM).map(new Path(_)) match {
+ case Some(userDefinedBasePath) =>
+ val fs = userDefinedBasePath.getFileSystem(hadoopConf)
+ if (!fs.isDirectory(userDefinedBasePath)) {
+ throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory")
+ }
+ Set(fs.makeQualified(userDefinedBasePath))
+
+ case None =>
+ rootPaths.map { path =>
+ // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+ val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path)
+ if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet
+ }
+ }
+
+ // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be
+ // counted as data files, so that they shouldn't participate partition discovery.
+ private def isDataPath(path: Path): Boolean = {
+ val name = path.getName
+ !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
+ }
+
+ /**
+ * List leaf files of given paths. This method will submit a Spark job to do parallel
+ * listing whenever there is a path having more files than the parallel partition discovery
+ * discovery threshold.
+ *
+ * This is publicly visible for testing.
+ */
+ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+ val output = mutable.LinkedHashSet[FileStatus]()
+ val pathsToFetch = mutable.ArrayBuffer[Path]()
+ for (path <- paths) {
+ fileStatusCache.getLeafFiles(path) match {
+ case Some(files) =>
+ HiveCatalogMetrics.incrementFileCacheHits(files.length)
+ output ++= files
+ case None =>
+ pathsToFetch += path
+ }
+ }
+ val discovered = if (pathsToFetch.length >=
+ sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+ PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
+ } else {
+ PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
+ }
+ discovered.foreach { case (path, leafFiles) =>
+ HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
+ fileStatusCache.putLeafFiles(path, leafFiles.toArray)
+ output ++= leafFiles
+ }
+ output
+ }
+}
+
+object PartitioningAwareFileIndex extends Logging {
+ val BASE_PATH_PARAM = "basePath"
+
+ /** A serializable variant of HDFS's BlockLocation. */
+ private case class SerializableBlockLocation(
+ names: Array[String],
+ hosts: Array[String],
+ offset: Long,
+ length: Long)
+
+ /** A serializable variant of HDFS's FileStatus. */
+ private case class SerializableFileStatus(
+ path: String,
+ length: Long,
+ isDir: Boolean,
+ blockReplication: Short,
+ blockSize: Long,
+ modificationTime: Long,
+ accessTime: Long,
+ blockLocations: Array[SerializableBlockLocation])
+
+ /**
+ * List a collection of path recursively.
+ */
+ private def listLeafFilesInSerial(
+ paths: Seq[Path],
+ hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
+ // Dummy jobconf to get to the pathFilter defined in configuration
+ val jobConf = new JobConf(hadoopConf, this.getClass)
+ val filter = FileInputFormat.getInputPathFilter(jobConf)
+
+ paths.map { path =>
+ val fs = path.getFileSystem(hadoopConf)
+ (path, listLeafFiles0(fs, path, filter))
+ }
+ }
+
+ /**
+ * List a collection of path recursively in parallel (using Spark executors).
+ * Each task launched will use [[listLeafFilesInSerial]] to list.
+ */
+ private def listLeafFilesInParallel(
+ paths: Seq[Path],
+ hadoopConf: Configuration,
+ sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
+ assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+ logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+
+ val sparkContext = sparkSession.sparkContext
+ val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+ val serializedPaths = paths.map(_.toString)
+
+ // Set the number of parallelism to prevent following file listing from generating many tasks
+ // in case of large #defaultParallelism.
+ val numParallelism = Math.min(paths.size, 10000)
+
+ val statusMap = sparkContext
+ .parallelize(serializedPaths, numParallelism)
+ .mapPartitions { paths =>
+ val hadoopConf = serializableConfiguration.value
+ listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
+ }.map { case (path, statuses) =>
+ val serializableStatuses = statuses.map { status =>
+ // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
+ val blockLocations = status match {
+ case f: LocatedFileStatus =>
+ f.getBlockLocations.map { loc =>
+ SerializableBlockLocation(
+ loc.getNames,
+ loc.getHosts,
+ loc.getOffset,
+ loc.getLength)
+ }
+
+ case _ =>
+ Array.empty[SerializableBlockLocation]
+ }
+
+ SerializableFileStatus(
+ status.getPath.toString,
+ status.getLen,
+ status.isDirectory,
+ status.getReplication,
+ status.getBlockSize,
+ status.getModificationTime,
+ status.getAccessTime,
+ blockLocations)
+ }
+ (path.toString, serializableStatuses)
+ }.collect()
+
+ // turn SerializableFileStatus back to Status
+ statusMap.map { case (path, serializableStatuses) =>
+ val statuses = serializableStatuses.map { f =>
+ val blockLocations = f.blockLocations.map { loc =>
+ new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+ }
+ new LocatedFileStatus(
+ new FileStatus(
+ f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
+ new Path(f.path)),
+ blockLocations)
+ }
+ (new Path(path), statuses)
+ }
+ }
+
+ /**
+ * List a single path, provided as a FileStatus, in serial.
+ */
+ private def listLeafFiles0(
+ fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+ logTrace(s"Listing $path")
+ val name = path.getName.toLowerCase
+ if (shouldFilterOut(name)) {
+ Seq.empty[FileStatus]
+ } else {
+ // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
+ // Note that statuses only include FileStatus for the files and dirs directly under path,
+ // and does not include anything else recursively.
+ val statuses = try fs.listStatus(path) catch {
+ case _: FileNotFoundException =>
+ logWarning(s"The directory $path was not found. Was it deleted very recently?")
+ Array.empty[FileStatus]
+ }
+
+ val allLeafStatuses = {
+ val (dirs, files) = statuses.partition(_.isDirectory)
+ val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
+ if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
+ }
+
+ allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
+ case f: LocatedFileStatus =>
+ f
+
+ // NOTE:
+ //
+ // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
+ // operations, calling `getFileBlockLocations` does no harm here since these file system
+ // implementations don't actually issue RPC for this method.
+ //
+ // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
+ // be a big deal since we always use to `listLeafFilesInParallel` when the number of
+ // paths exceeds threshold.
+ case f =>
+ // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
+ // which is very slow on some file system (RawLocalFileSystem, which is launch a
+ // subprocess and parse the stdout).
+ val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+ val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
+ f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
+ if (f.isSymlink) {
+ lfs.setSymlink(f.getSymlink)
+ }
+ lfs
+ }
+ }
+ }
+
+ /** Checks if we should filter out this path name. */
+ def shouldFilterOut(pathName: String): Boolean = {
+ // We filter everything that starts with _ and ., except _common_metadata and _metadata
+ // because Parquet needs to find those metadata files from leaf files returned by this method.
+ // We should refactor this logic to not mix metadata files with data files.
+ ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
+ !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 8689017..8566a80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -28,7 +28,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
logicalRelation @
LogicalRelation(fsRelation @
HadoopFsRelation(
- tableFileCatalog: TableFileCatalog,
+ catalogFileIndex: CatalogFileIndex,
partitionSchema,
_,
_,
@@ -56,9 +56,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
if (partitionKeyFilters.nonEmpty) {
- val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
+ val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
- fsRelation.copy(location = prunedFileCatalog)(sparkSession)
+ fsRelation.copy(location = prunedFileIndex)(sparkSession)
val prunedLogicalRelation = logicalRelation.copy(
relation = prunedFsRelation,
expectedOutputAttributes = Some(logicalRelation.output))
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
deleted file mode 100644
index b459df5..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.StructType
-
-
-/**
- * A [[FileCatalog]] for a metastore catalog table.
- *
- * @param sparkSession a [[SparkSession]]
- * @param table the metadata of the table
- * @param sizeInBytes the table's data size in bytes
- */
-class TableFileCatalog(
- sparkSession: SparkSession,
- val table: CatalogTable,
- override val sizeInBytes: Long) extends FileCatalog {
-
- protected val hadoopConf = sparkSession.sessionState.newHadoopConf
-
- private val fileStatusCache = FileStatusCache.newCache(sparkSession)
-
- assert(table.identifier.database.isDefined,
- "The table identifier must be qualified in TableFileCatalog")
-
- private val baseLocation = table.storage.locationUri
-
- override def partitionSchema: StructType = table.partitionSchema
-
- override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
-
- override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
- filterPartitions(filters).listFiles(Nil)
- }
-
- override def refresh(): Unit = fileStatusCache.invalidateAll()
-
- /**
- * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
- * specified by the given partition-pruning filters.
- *
- * @param filters partition-pruning filters
- */
- def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
- if (table.partitionColumnNames.nonEmpty) {
- val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
- table.identifier, filters)
- val partitions = selectedPartitions.map { p =>
- PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
- }
- val partitionSpec = PartitionSpec(partitionSchema, partitions)
- new PrunedTableFileCatalog(
- sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
- } else {
- new ListingFileCatalog(sparkSession, rootPaths, table.storage.properties, None)
- }
- }
-
- override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
-
- // `TableFileCatalog` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
- // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
- // implement `equals` and `hashCode` here, to make it work with cache lookup.
- override def equals(o: Any): Boolean = o match {
- case other: TableFileCatalog => this.table.identifier == other.table.identifier
- case _ => false
- }
-
- override def hashCode(): Int = table.identifier.hashCode()
-}
-
-/**
- * An override of the standard HDFS listing based catalog, that overrides the partition spec with
- * the information from the metastore.
- *
- * @param tableBasePath The default base path of the Hive metastore table
- * @param partitionSpec The partition specifications from Hive metastore
- */
-private class PrunedTableFileCatalog(
- sparkSession: SparkSession,
- tableBasePath: Path,
- fileStatusCache: FileStatusCache,
- override val partitionSpec: PartitionSpec)
- extends ListingFileCatalog(
- sparkSession,
- partitionSpec.partitions.map(_.path),
- Map.empty,
- Some(partitionSpec.partitionColumns),
- fileStatusCache)
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index c14feea..b26edee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -146,7 +146,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
*/
def allFiles(): Array[T] = {
var latestId = getLatest().map(_._1).getOrElse(-1L)
- // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog`
+ // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileIndex`
// is calling this method. This loop will retry the reading to deal with the
// race condition.
while (true) {
@@ -158,7 +158,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
} catch {
case e: IOException =>
// Another process using `CompactibleFileStreamLog` may delete the batch files when
- // `StreamFileCatalog` are reading. However, it only happens when a compaction is
+ // `StreamFileIndex` are reading. However, it only happens when a compaction is
// deleting old files. If so, let's try the next compaction batch and we should find it.
// Otherwise, this is a real IO issue and we should throw it.
latestId = nextCompactionBatchId(latestId, compactInterval)
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index a392b82..680df01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.types.StructType
/**
@@ -156,7 +156,7 @@ class FileStreamSource(
private def fetchAllFiles(): Seq[(String, Long)] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
- val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
+ val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
(status.getPath.toUri.toString, status.getModificationTime)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
index 82b67cb..aeaa134 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
@@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.datasources._
/**
- * A [[FileCatalog]] that generates the list of files to processing by reading them from the
+ * A [[FileIndex]] that generates the list of files to processing by reading them from the
* metadata log files generated by the [[FileStreamSink]].
*/
-class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path)
- extends PartitioningAwareFileCatalog(sparkSession, Map.empty, None) {
+class MetadataLogFileIndex(sparkSession: SparkSession, path: Path)
+ extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) {
private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
logInfo(s"Reading streaming file log from $metadataDirectory")
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index 9c43169..56df1fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -28,15 +28,15 @@ import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.SharedSQLContext
-class FileCatalogSuite extends SharedSQLContext {
+class FileIndexSuite extends SharedSQLContext {
- test("ListingFileCatalog: leaf files are qualified paths") {
+ test("InMemoryFileIndex: leaf files are qualified paths") {
withTempDir { dir =>
val file = new File(dir, "text.txt")
stringToFile(file, "text")
val path = new Path(file.getCanonicalPath)
- val catalog = new ListingFileCatalog(spark, Seq(path), Map.empty, None) {
+ val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) {
def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
}
@@ -45,7 +45,7 @@ class FileCatalogSuite extends SharedSQLContext {
}
}
- test("ListingFileCatalog: input paths are converted to qualified paths") {
+ test("InMemoryFileIndex: input paths are converted to qualified paths") {
withTempDir { dir =>
val file = new File(dir, "text.txt")
stringToFile(file, "text")
@@ -59,42 +59,42 @@ class FileCatalogSuite extends SharedSQLContext {
val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath))
require(qualifiedFilePath.toString.startsWith("file:"))
- val catalog1 = new ListingFileCatalog(
+ val catalog1 = new InMemoryFileIndex(
spark, Seq(unqualifiedDirPath), Map.empty, None)
assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
- val catalog2 = new ListingFileCatalog(
+ val catalog2 = new InMemoryFileIndex(
spark, Seq(unqualifiedFilePath), Map.empty, None)
assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
}
}
- test("ListingFileCatalog: folders that don't exist don't throw exceptions") {
+ test("InMemoryFileIndex: folders that don't exist don't throw exceptions") {
withTempDir { dir =>
val deletedFolder = new File(dir, "deleted")
assert(!deletedFolder.exists())
- val catalog1 = new ListingFileCatalog(
+ val catalog1 = new InMemoryFileIndex(
spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None)
// doesn't throw an exception
assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty)
}
}
- test("PartitioningAwareFileCatalog - file filtering") {
- assert(!PartitioningAwareFileCatalog.shouldFilterOut("abcd"))
- assert(PartitioningAwareFileCatalog.shouldFilterOut(".ab"))
- assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd"))
- assert(!PartitioningAwareFileCatalog.shouldFilterOut("_metadata"))
- assert(!PartitioningAwareFileCatalog.shouldFilterOut("_common_metadata"))
- assert(PartitioningAwareFileCatalog.shouldFilterOut("_ab_metadata"))
- assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd_common_metadata"))
+ test("PartitioningAwareFileIndex - file filtering") {
+ assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
+ assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))
+ assert(PartitioningAwareFileIndex.shouldFilterOut("_cd"))
+ assert(!PartitioningAwareFileIndex.shouldFilterOut("_metadata"))
+ assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata"))
+ assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata"))
+ assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata"))
}
- test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") {
+ test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
class MockCatalog(
override val rootPaths: Seq[Path])
- extends PartitioningAwareFileCatalog(spark, Map.empty, None) {
+ extends PartitioningAwareFileIndex(spark, Map.empty, None) {
override def refresh(): Unit = {}
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index c32254d..d900ce7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -393,7 +393,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
util.stringToFile(file, fileName)
}
- val fileCatalog = new ListingFileCatalog(
+ val fileCatalog = new InMemoryFileIndex(
sparkSession = spark,
rootPaths = Seq(new Path(tempDir)),
parameters = Map.empty[String, String],
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index f2a209e..120a3a2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -634,7 +634,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
case LogicalRelation(
- HadoopFsRelation(location: PartitioningAwareFileCatalog, _, _, _, _, _), _, _) =>
+ HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), _, _) =>
assert(location.partitionSpec() === PartitionSpec.emptySpec)
}.getOrElse {
fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution")
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 19c89f5..18b42a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileCatalog}
+import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileIndex}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -179,14 +179,14 @@ class FileStreamSinkSuite extends StreamTest {
.add(StructField("id", IntegerType))
assert(outputDf.schema === expectedSchema)
- // Verify that MetadataLogFileCatalog is being used and the correct partitioning schema has
+ // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect {
case LogicalRelation(baseRelation, _, _) if baseRelation.isInstanceOf[HadoopFsRelation] =>
baseRelation.asInstanceOf[HadoopFsRelation]
}
assert(hadoopdFsRelations.size === 1)
- assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileCatalog])
+ assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))
http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index b9e9da9..47018b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -879,7 +879,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val numFiles = 10000
// This is to avoid running a spark job to list of files in parallel
- // by the ListingFileCatalog.
+ // by the InMemoryFileIndex.
spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)
withTempDirs { case (root, tmp) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org