You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/10/22 14:08:40 UTC
spark git commit: [SPARK-17994][SQL] Add back a file status cache for
catalog tables
Repository: spark
Updated Branches:
refs/heads/master ab3363e9f -> 3eca283ac
[SPARK-17994][SQL] Add back a file status cache for catalog tables
## What changes were proposed in this pull request?
In SPARK-16980, we removed the full in-memory cache of table partitions in favor of loading only needed partitions from the metastore. This greatly improves the initial latency of queries that only read a small fraction of table partitions.
However, since the metastore does not store file statistics, we need to discover those from remote storage. With the loss of the in-memory file status cache this has to happen on each query, increasing the latency of repeated queries over the same partitions.
The proposal is to add back a per-table cache of partition contents, i.e. Map[Path, Array[FileStatus]]. This cache would be retained per-table, and can be invalidated through refreshTable() and refreshByPath(). Unlike the prior cache, it can be incrementally updated as new partitions are read.
## How was this patch tested?
Existing tests and new tests in `HiveTablePerfStatsSuite`.
cc mallman
Author: Eric Liang <ek...@databricks.com>
Author: Michael Allman <mi...@videoamp.com>
Author: Eric Liang <ek...@gmail.com>
Closes #15539 from ericl/meta-cache.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3eca283a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3eca283a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3eca283a
Branch: refs/heads/master
Commit: 3eca283aca68ac81c127d60ad5699f854d5f14b7
Parents: ab3363e
Author: Eric Liang <ek...@databricks.com>
Authored: Sat Oct 22 22:08:28 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Oct 22 22:08:28 2016 +0800
----------------------------------------------------------------------
.../spark/metrics/source/StaticSources.scala | 7 +
.../execution/datasources/FileStatusCache.scala | 149 ++++++++++++
.../datasources/ListingFileCatalog.scala | 13 +-
.../PartitioningAwareFileCatalog.scala | 115 +++++----
.../datasources/TableFileCatalog.scala | 36 +--
.../org/apache/spark/sql/internal/SQLConf.scala | 16 +-
.../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +-
.../spark/sql/hive/HiveDDLCommandSuite.scala | 16 +-
.../spark/sql/hive/HiveDataFrameSuite.scala | 145 -----------
.../sql/hive/HiveTablePerfStatsSuite.scala | 240 +++++++++++++++++++
10 files changed, 514 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index cf92a10..b54885b 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -81,14 +81,21 @@ object HiveCatalogMetrics extends Source {
val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
/**
+ * Tracks the total number of files served from the file status cache instead of discovered.
+ */
+ val METRIC_FILE_CACHE_HITS = metricRegistry.counter(MetricRegistry.name("fileCacheHits"))
+
+ /**
* Resets the values of all metrics to zero. This is useful in tests.
*/
def reset(): Unit = {
METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
+ METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
}
// clients can use these to avoid classloader issues with the codahale classes
def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n)
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
+ def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
new file mode 100644
index 0000000..e0ec748
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import com.google.common.cache._
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+
+/**
+ * A cache of the leaf files of partition directories. We cache these files in order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each query would have to
+ * hit remote storage in order to gather file statistics for physical planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the backing relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache will be invalidated.
+ */
+abstract class FileStatusCache {
+ /**
+ * @return the leaf files for the specified path from this cache, or None if not cached.
+ */
+ def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+ /**
+ * Saves the given set of leaf files for a path in this cache.
+ */
+ def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+ /**
+ * Invalidates all data held by this cache.
+ */
+ def invalidateAll(): Unit
+}
+
+object FileStatusCache {
+ private var sharedCache: SharedInMemoryCache = null
+
+ /**
+ * @return a new FileStatusCache based on session configuration. Cache memory quota is
+ * shared across all clients.
+ */
+ def newCache(session: SparkSession): FileStatusCache = {
+ synchronized {
+ if (session.sqlContext.conf.filesourcePartitionPruning &&
+ session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
+ if (sharedCache == null) {
+ sharedCache = new SharedInMemoryCache(
+ session.sqlContext.conf.filesourcePartitionFileCacheSize)
+ }
+ sharedCache.getForNewClient()
+ } else {
+ NoopCache
+ }
+ }
+ }
+
+ def resetForTesting(): Unit = synchronized {
+ sharedCache = null
+ }
+}
+
+/**
+ * An implementation that caches partition file statuses in memory.
+ *
+ * @param maxSizeInBytes max allowable cache size before entries start getting evicted
+ */
+private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
+ import FileStatusCache._
+
+ // Opaque object that uniquely identifies a shared cache user
+ private type ClientId = Object
+
+ private val warnedAboutEviction = new AtomicBoolean(false)
+
+ // we use a composite cache key in order to distinguish entries inserted by different clients
+ private val cache: Cache[(ClientId, Path), Array[FileStatus]] = CacheBuilder.newBuilder()
+ .weigher(new Weigher[(ClientId, Path), Array[FileStatus]] {
+ override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = {
+ (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt
+ }})
+ .removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
+ override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]]) = {
+ if (removed.getCause() == RemovalCause.SIZE &&
+ warnedAboutEviction.compareAndSet(false, true)) {
+ logWarning(
+ "Evicting cached table partition metadata from memory due to size constraints " +
+ "(spark.sql.hive.filesourcePartitionFileCacheSize = " + maxSizeInBytes + " bytes). " +
+ "This may impact query planning performance.")
+ }
+ }})
+ .maximumWeight(maxSizeInBytes)
+ .build()
+
+ /**
+ * @return a FileStatusCache that does not share any entries with any other client, but does
+ * share memory resources for the purpose of cache eviction.
+ */
+ def getForNewClient(): FileStatusCache = new FileStatusCache {
+ val clientId = new Object()
+
+ override def getLeafFiles(path: Path): Option[Array[FileStatus]] = {
+ Option(cache.getIfPresent((clientId, path)))
+ }
+
+ override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit = {
+ cache.put((clientId, path), leafFiles.toArray)
+ }
+
+ override def invalidateAll(): Unit = {
+ cache.asMap.asScala.foreach { case (key, value) =>
+ if (key._1 == clientId) {
+ cache.invalidate(key)
+ }
+ }
+ }
+ }
+}
+
+/**
+ * A non-caching implementation used when partition file status caching is disabled.
+ */
+object NoopCache extends FileStatusCache {
+ override def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+ override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit = {}
+ override def invalidateAll(): Unit = {}
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 6d10501..d9d5883 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -38,14 +38,16 @@ class ListingFileCatalog(
sparkSession: SparkSession,
override val rootPaths: Seq[Path],
parameters: Map[String, String],
- partitionSchema: Option[StructType])
- extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) {
+ partitionSchema: Option[StructType],
+ fileStatusCache: FileStatusCache = NoopCache)
+ extends PartitioningAwareFileCatalog(
+ sparkSession, parameters, partitionSchema, fileStatusCache) {
@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
@volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
@volatile private var cachedPartitionSpec: PartitionSpec = _
- refresh()
+ refresh0()
override def partitionSpec(): PartitionSpec = {
if (cachedPartitionSpec == null) {
@@ -64,6 +66,11 @@ class ListingFileCatalog(
}
override def refresh(): Unit = {
+ refresh0()
+ fileStatusCache.invalidateAll()
+ }
+
+ private def refresh0(): Unit = {
val files = listLeafFiles(rootPaths)
cachedLeafFiles =
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 5c8eff7..9b1903c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
-
/**
* An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables.
* It provides the necessary methods to parse partition data based on a set of files.
@@ -45,7 +44,8 @@ import org.apache.spark.util.SerializableConfiguration
abstract class PartitioningAwareFileCatalog(
sparkSession: SparkSession,
parameters: Map[String, String],
- partitionSchema: Option[StructType]) extends FileCatalog with Logging {
+ partitionSchema: Option[StructType],
+ fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging {
import PartitioningAwareFileCatalog.BASE_PATH_PARAM
/** Returns the specification of the partitions inferred from the data. */
@@ -238,15 +238,29 @@ abstract class PartitioningAwareFileCatalog(
* This is publicly visible for testing.
*/
def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
- val files =
- if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
- PartitioningAwareFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
- } else {
- PartitioningAwareFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
+ val output = mutable.LinkedHashSet[FileStatus]()
+ val pathsToFetch = mutable.ArrayBuffer[Path]()
+ for (path <- paths) {
+ fileStatusCache.getLeafFiles(path) match {
+ case Some(files) =>
+ HiveCatalogMetrics.incrementFileCacheHits(files.length)
+ output ++= files
+ case None =>
+ pathsToFetch += path
}
-
- HiveCatalogMetrics.incrementFilesDiscovered(files.size)
- mutable.LinkedHashSet(files: _*)
+ }
+ val discovered = if (pathsToFetch.length >=
+ sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+ PartitioningAwareFileCatalog.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
+ } else {
+ PartitioningAwareFileCatalog.listLeafFilesInSerial(pathsToFetch, hadoopConf)
+ }
+ discovered.foreach { case (path, leafFiles) =>
+ HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
+ fileStatusCache.putLeafFiles(path, leafFiles.toArray)
+ output ++= leafFiles
+ }
+ output
}
}
@@ -276,14 +290,14 @@ object PartitioningAwareFileCatalog extends Logging {
*/
private def listLeafFilesInSerial(
paths: Seq[Path],
- hadoopConf: Configuration): Seq[FileStatus] = {
+ hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(hadoopConf, this.getClass)
val filter = FileInputFormat.getInputPathFilter(jobConf)
- paths.flatMap { path =>
+ paths.map { path =>
val fs = path.getFileSystem(hadoopConf)
- listLeafFiles0(fs, path, filter)
+ (path, listLeafFiles0(fs, path, filter))
}
}
@@ -294,7 +308,7 @@ object PartitioningAwareFileCatalog extends Logging {
private def listLeafFilesInParallel(
paths: Seq[Path],
hadoopConf: Configuration,
- sparkSession: SparkSession): Seq[FileStatus] = {
+ sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
@@ -306,47 +320,54 @@ object PartitioningAwareFileCatalog extends Logging {
// in case of large #defaultParallelism.
val numParallelism = Math.min(paths.size, 10000)
- val statuses = sparkContext
+ val statusMap = sparkContext
.parallelize(serializedPaths, numParallelism)
.mapPartitions { paths =>
val hadoopConf = serializableConfiguration.value
listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
- }.map { status =>
- // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
- val blockLocations = status match {
- case f: LocatedFileStatus =>
- f.getBlockLocations.map { loc =>
- SerializableBlockLocation(
- loc.getNames,
- loc.getHosts,
- loc.getOffset,
- loc.getLength)
- }
-
- case _ =>
- Array.empty[SerializableBlockLocation]
- }
+ }.map { case (path, statuses) =>
+ val serializableStatuses = statuses.map { status =>
+ // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
+ val blockLocations = status match {
+ case f: LocatedFileStatus =>
+ f.getBlockLocations.map { loc =>
+ SerializableBlockLocation(
+ loc.getNames,
+ loc.getHosts,
+ loc.getOffset,
+ loc.getLength)
+ }
+
+ case _ =>
+ Array.empty[SerializableBlockLocation]
+ }
- SerializableFileStatus(
- status.getPath.toString,
- status.getLen,
- status.isDirectory,
- status.getReplication,
- status.getBlockSize,
- status.getModificationTime,
- status.getAccessTime,
- blockLocations)
+ SerializableFileStatus(
+ status.getPath.toString,
+ status.getLen,
+ status.isDirectory,
+ status.getReplication,
+ status.getBlockSize,
+ status.getModificationTime,
+ status.getAccessTime,
+ blockLocations)
+ }
+ (path.toString, serializableStatuses)
}.collect()
- // Turn SerializableFileStatus back to Status
- statuses.map { f =>
- val blockLocations = f.blockLocations.map { loc =>
- new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+ // turn SerializableFileStatus back to Status
+ statusMap.map { case (path, serializableStatuses) =>
+ val statuses = serializableStatuses.map { f =>
+ val blockLocations = f.blockLocations.map { loc =>
+ new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+ }
+ new LocatedFileStatus(
+ new FileStatus(
+ f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
+ new Path(f.path)),
+ blockLocations)
}
- new LocatedFileStatus(
- new FileStatus(
- f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
- blockLocations)
+ (new Path(path), statuses)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index fc08c37..31a01bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.types.StructType
* @param table the table's (unqualified) name
* @param partitionSchema the schema of a partitioned table's partition columns
* @param sizeInBytes the table's data size in bytes
+ * @param fileStatusCache optional cache implementation to use for file listing
*/
class TableFileCatalog(
sparkSession: SparkSession,
@@ -42,24 +43,21 @@ class TableFileCatalog(
protected val hadoopConf = sparkSession.sessionState.newHadoopConf
+ private val fileStatusCache = FileStatusCache.newCache(sparkSession)
+
private val externalCatalog = sparkSession.sharedState.externalCatalog
private val catalogTable = externalCatalog.getTable(db, table)
private val baseLocation = catalogTable.storage.locationUri
- // Populated on-demand by calls to cachedAllPartitions
- private var cachedAllPartitions: ListingFileCatalog = null
-
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
filterPartitions(filters).listFiles(Nil)
}
- override def refresh(): Unit = synchronized {
- cachedAllPartitions = null
- }
+ override def refresh(): Unit = fileStatusCache.invalidateAll()
/**
* Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
@@ -68,14 +66,6 @@ class TableFileCatalog(
* @param filters partition-pruning filters
*/
def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
- if (filters.isEmpty) {
- allPartitions
- } else {
- filterPartitions0(filters)
- }
- }
-
- private def filterPartitions0(filters: Seq[Expression]): ListingFileCatalog = {
val parameters = baseLocation
.map(loc => Map(PartitioningAwareFileCatalog.BASE_PATH_PARAM -> loc))
.getOrElse(Map.empty)
@@ -87,21 +77,13 @@ class TableFileCatalog(
}
val partitionSpec = PartitionSpec(schema, partitions)
new PrunedTableFileCatalog(
- sparkSession, new Path(baseLocation.get), partitionSpec)
+ sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
case None =>
- new ListingFileCatalog(sparkSession, rootPaths, parameters, None)
- }
- }
-
- // Not used in the hot path of queries when metastore partition pruning is enabled
- def allPartitions: ListingFileCatalog = synchronized {
- if (cachedAllPartitions == null) {
- cachedAllPartitions = filterPartitions0(Nil)
+ new ListingFileCatalog(sparkSession, rootPaths, parameters, None, fileStatusCache)
}
- cachedAllPartitions
}
- override def inputFiles: Array[String] = allPartitions.inputFiles
+ override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
}
/**
@@ -114,9 +96,11 @@ class TableFileCatalog(
private class PrunedTableFileCatalog(
sparkSession: SparkSession,
tableBasePath: Path,
+ fileStatusCache: FileStatusCache,
override val partitionSpec: PartitionSpec)
extends ListingFileCatalog(
sparkSession,
partitionSpec.partitions.map(_.path),
Map.empty,
- Some(partitionSpec.partitionColumns))
+ Some(partitionSpec.partitionColumns),
+ fileStatusCache)
http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index ebf4fad..a6e2fa2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -265,17 +265,27 @@ object SQLConf {
val HIVE_METASTORE_PARTITION_PRUNING =
SQLConfigBuilder("spark.sql.hive.metastorePartitionPruning")
.doc("When true, some predicates will be pushed down into the Hive metastore so that " +
- "unmatching partitions can be eliminated earlier.")
+ "unmatching partitions can be eliminated earlier. This only affects Hive tables " +
+ "not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and " +
+ "HiveUtils.CONVERT_METASTORE_ORC for more information).")
.booleanConf
.createWithDefault(true)
val HIVE_FILESOURCE_PARTITION_PRUNING =
SQLConfigBuilder("spark.sql.hive.filesourcePartitionPruning")
- .doc("When true, enable metastore partition pruning for file source tables as well. " +
+ .doc("When true, enable metastore partition pruning for filesource relations as well. " +
"This is currently implemented for converted Hive tables only.")
.booleanConf
.createWithDefault(true)
+ val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE =
+ SQLConfigBuilder("spark.sql.hive.filesourcePartitionFileCacheSize")
+ .doc("When nonzero, enable caching of partition file metadata in memory. All table share " +
+ "a cache that can use up to specified num bytes for file metadata. This conf only " +
+ "applies if filesource partition pruning is also enabled.")
+ .longConf
+ .createWithDefault(250 * 1024 * 1024)
+
val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
@@ -670,6 +680,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def filesourcePartitionPruning: Boolean = getConf(HIVE_FILESOURCE_PARTITION_PRUNING)
+ def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
+
def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c909eb5..4408933 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
if (lazyPruningEnabled) {
catalog
} else {
- catalog.allPartitions
+ catalog.filterPartitions(Nil) // materialize all the partitions in memory
}
}
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 8133749..d13e29b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -577,5 +577,19 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
assert(output == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
assert(serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
}
- }
+ }
+
+ test("table name with schema") {
+ // regression test for SPARK-11778
+ spark.sql("create schema usrdb")
+ spark.sql("create table usrdb.test(c int)")
+ spark.read.table("usrdb.test")
+ spark.sql("drop table usrdb.test")
+ spark.sql("drop schema usrdb")
+ }
+
+ test("SPARK-15887: hive-site.xml should be loaded") {
+ val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ assert(hiveClient.getConf("hive.in.test", "") == "true")
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
deleted file mode 100644
index 1552343..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.File
-
-import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
-
-class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
- test("table name with schema") {
- // regression test for SPARK-11778
- spark.sql("create schema usrdb")
- spark.sql("create table usrdb.test(c int)")
- spark.read.table("usrdb.test")
- spark.sql("drop table usrdb.test")
- spark.sql("drop schema usrdb")
- }
-
- test("SPARK-15887: hive-site.xml should be loaded") {
- val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
- assert(hiveClient.getConf("hive.in.test", "") == "true")
- }
-
- private def setupPartitionedTable(tableName: String, dir: File): Unit = {
- spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write
- .partitionBy("partCol1", "partCol2")
- .mode("overwrite")
- .parquet(dir.getAbsolutePath)
-
- spark.sql(s"""
- |create external table $tableName (id long)
- |partitioned by (partCol1 int, partCol2 int)
- |stored as parquet
- |location "${dir.getAbsolutePath}"""".stripMargin)
- spark.sql(s"msck repair table $tableName")
- }
-
- test("partitioned pruned table reports only selected files") {
- assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true")
- withTable("test") {
- withTempDir { dir =>
- setupPartitionedTable("test", dir)
- val df = spark.sql("select * from test")
- assert(df.count() == 5)
- assert(df.inputFiles.length == 5) // unpruned
-
- val df2 = spark.sql("select * from test where partCol1 = 3 or partCol2 = 4")
- assert(df2.count() == 2)
- assert(df2.inputFiles.length == 2) // pruned, so we have less files
-
- val df3 = spark.sql("select * from test where PARTCOL1 = 3 or partcol2 = 4")
- assert(df3.count() == 2)
- assert(df3.inputFiles.length == 2)
-
- val df4 = spark.sql("select * from test where partCol1 = 999")
- assert(df4.count() == 0)
- assert(df4.inputFiles.length == 0)
- }
- }
- }
-
- test("lazy partition pruning reads only necessary partition data") {
- withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") {
- withTable("test") {
- withTempDir { dir =>
- setupPartitionedTable("test", dir)
- HiveCatalogMetrics.reset()
- spark.sql("select * from test where partCol1 = 999").count()
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-
- HiveCatalogMetrics.reset()
- spark.sql("select * from test where partCol1 < 2").count()
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
-
- HiveCatalogMetrics.reset()
- spark.sql("select * from test where partCol1 < 3").count()
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 3)
-
- // should read all
- HiveCatalogMetrics.reset()
- spark.sql("select * from test").count()
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-
- // read all should be cached
- HiveCatalogMetrics.reset()
- spark.sql("select * from test").count()
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
- }
- }
- }
- }
-
- test("all partitions read and cached when filesource partition pruning is off") {
- withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
- withTable("test") {
- withTempDir { dir =>
- setupPartitionedTable("test", dir)
-
- // We actually query the partitions from hive each time the table is resolved in this
- // mode. This is kind of terrible, but is needed to preserve the legacy behavior
- // of doing plan cache validation based on the entire partition set.
- HiveCatalogMetrics.reset()
- spark.sql("select * from test where partCol1 = 999").count()
- // 5 from table resolution, another 5 from ListingFileCatalog
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-
- HiveCatalogMetrics.reset()
- spark.sql("select * from test where partCol1 < 2").count()
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-
- HiveCatalogMetrics.reset()
- spark.sql("select * from test").count()
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
new file mode 100644
index 0000000..82ee813
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class HiveTablePerfStatsSuite
+ extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ FileStatusCache.resetForTesting()
+ }
+
+ override def afterEach(): Unit = {
+ super.afterEach()
+ FileStatusCache.resetForTesting()
+ }
+
+ private def setupPartitionedTable(tableName: String, dir: File): Unit = {
+ spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write
+ .partitionBy("partCol1", "partCol2")
+ .mode("overwrite")
+ .parquet(dir.getAbsolutePath)
+
+ spark.sql(s"""
+ |create external table $tableName (id long)
+ |partitioned by (partCol1 int, partCol2 int)
+ |stored as parquet
+ |location "${dir.getAbsolutePath}"""".stripMargin)
+ spark.sql(s"msck repair table $tableName")
+ }
+
+ test("partitioned pruned table reports only selected files") {
+ assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true")
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedTable("test", dir)
+ val df = spark.sql("select * from test")
+ assert(df.count() == 5)
+ assert(df.inputFiles.length == 5) // unpruned
+
+ val df2 = spark.sql("select * from test where partCol1 = 3 or partCol2 = 4")
+ assert(df2.count() == 2)
+ assert(df2.inputFiles.length == 2) // pruned, so we have less files
+
+ val df3 = spark.sql("select * from test where PARTCOL1 = 3 or partcol2 = 4")
+ assert(df3.count() == 2)
+ assert(df3.inputFiles.length == 2)
+
+ val df4 = spark.sql("select * from test where partCol1 = 999")
+ assert(df4.count() == 0)
+ assert(df4.inputFiles.length == 0)
+ }
+ }
+ }
+
+ test("lazy partition pruning reads only necessary partition data") {
+ withSQLConf(
+ SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true",
+ SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "0") {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedTable("test", dir)
+ HiveCatalogMetrics.reset()
+ spark.sql("select * from test where partCol1 = 999").count()
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+
+ HiveCatalogMetrics.reset()
+ spark.sql("select * from test where partCol1 < 2").count()
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
+
+ HiveCatalogMetrics.reset()
+ spark.sql("select * from test where partCol1 < 3").count()
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 3)
+
+ // should read all
+ HiveCatalogMetrics.reset()
+ spark.sql("select * from test").count()
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+
+ // read all should not be cached
+ HiveCatalogMetrics.reset()
+ spark.sql("select * from test").count()
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+
+ // cache should be disabled
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+ }
+ }
+ }
+ }
+
+ test("lazy partition pruning with file status caching enabled") {
+ withSQLConf(
+ "spark.sql.hive.filesourcePartitionPruning" -> "true",
+ "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedTable("test", dir)
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 < 3").count() == 3)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 2)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 3)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 5)
+ }
+ }
+ }
+ }
+
+ test("file status caching respects refresh table and refreshByPath") {
+ withSQLConf(
+ "spark.sql.hive.filesourcePartitionPruning" -> "true",
+ "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedTable("test", dir)
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+ HiveCatalogMetrics.reset()
+ spark.sql("refresh table test")
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+ spark.catalog.cacheTable("test")
+ HiveCatalogMetrics.reset()
+ spark.catalog.refreshByPath(dir.getAbsolutePath)
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+ }
+ }
+ }
+ }
+
+ test("file status cache respects size limit") {
+ withSQLConf(
+ "spark.sql.hive.filesourcePartitionPruning" -> "true",
+ "spark.sql.hive.filesourcePartitionFileCacheSize" -> "1" /* 1 byte */) {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedTable("test", dir)
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+ }
+ }
+ }
+ }
+
+ test("all partitions read and cached when filesource partition pruning is off") {
+ withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedTable("test", dir)
+
+ // We actually query the partitions from hive each time the table is resolved in this
+ // mode. This is kind of terrible, but is needed to preserve the legacy behavior
+ // of doing plan cache validation based on the entire partition set.
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
+ // 5 from table resolution, another 5 from ListingFileCatalog
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+
+ HiveCatalogMetrics.reset()
+ assert(spark.sql("select * from test").count() == 5)
+ assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+ }
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org