You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/04/13 16:23:01 UTC
spark git commit: [SPARK-23896][SQL] Improve
PartitioningAwareFileIndex
Repository: spark
Updated Branches:
refs/heads/master a83ae0d9b -> 4dfd746de
[SPARK-23896][SQL] Improve PartitioningAwareFileIndex
## What changes were proposed in this pull request?
Currently `PartitioningAwareFileIndex` accepts an optional parameter `userPartitionSchema`. If provided, it will combine the inferred partition schema with the parameter.
However,
1. to get `userPartitionSchema`, we need to combine inferred partition schema with `userSpecifiedSchema`
2. to get the inferred partition schema, we have to create a temporary file index.
Only after that, a final version of `PartitioningAwareFileIndex` can be created.
This can be improved by passing `userSpecifiedSchema` to `PartitioningAwareFileIndex`.
With the improvement, we can reduce redundant code and avoid parsing the file partition twice.
## How was this patch tested?
Unit test
Author: Gengliang Wang <ge...@databricks.com>
Closes #21004 from gengliangwang/PartitioningAwareFileIndex.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4dfd746d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4dfd746d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4dfd746d
Branch: refs/heads/master
Commit: 4dfd746de3f4346ed0c2191f8523a7e6cc9f064d
Parents: a83ae0d
Author: Gengliang Wang <ge...@databricks.com>
Authored: Sat Apr 14 00:22:38 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Apr 14 00:22:38 2018 +0800
----------------------------------------------------------------------
.../datasources/CatalogFileIndex.scala | 2 +-
.../sql/execution/datasources/DataSource.scala | 133 ++++++++-----------
.../datasources/InMemoryFileIndex.scala | 8 +-
.../PartitioningAwareFileIndex.scala | 54 +++++---
.../streaming/MetadataLogFileIndex.scala | 10 +-
.../datasources/FileSourceStrategySuite.scala | 2 +-
.../hive/PartitionedTablePerfStatsSuite.scala | 2 +-
7 files changed, 103 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 4046396..a66a076 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -85,7 +85,7 @@ class CatalogFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
} else {
new InMemoryFileIndex(
- sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
+ sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index b84ea76..f16d824 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
import scala.language.{existentials, implicitConversions}
import scala.util.{Failure, Success, Try}
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
@@ -104,24 +103,6 @@ case class DataSource(
}
/**
- * In the read path, only managed tables by Hive provide the partition columns properly when
- * initializing this class. All other file based data sources will try to infer the partitioning,
- * and then cast the inferred types to user specified dataTypes if the partition columns exist
- * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or
- * inconsistent data types as reported in SPARK-21463.
- * @param fileIndex A FileIndex that will perform partition inference
- * @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema`
- */
- private def combineInferredAndUserSpecifiedPartitionSchema(fileIndex: FileIndex): StructType = {
- val resolved = fileIndex.partitionSchema.map { partitionField =>
- // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
- userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
- partitionField)
- }
- StructType(resolved)
- }
-
- /**
* Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer
* it. In the read path, only managed tables by Hive provide the partition columns properly when
* initializing this class. All other file based data sources will try to infer the partitioning,
@@ -140,31 +121,26 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
- * @param fileStatusCache the shared cache for file statuses to speed up listing
+ * @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
* columns.
*/
private def getOrInferFileFormatSchema(
format: FileFormat,
- fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {
- // the operations below are expensive therefore try not to do them if we don't need to, e.g.,
+ fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
+ // The operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
- lazy val tempFileIndex = {
- val allPaths = caseInsensitiveOptions.get("path") ++ paths
- val hadoopConf = sparkSession.sessionState.newHadoopConf()
- val globbedPaths = allPaths.toSeq.flatMap { path =>
- val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(hadoopConf)
- val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
- }.toArray
- new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
+ lazy val tempFileIndex = fileIndex.getOrElse {
+ val globbedPaths =
+ checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
+ createInMemoryFileIndex(globbedPaths)
}
+
val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource
- combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex)
+ tempFileIndex.partitionSchema
} else {
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
// partitioning
@@ -356,13 +332,7 @@ case class DataSource(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
- val tempFileCatalog = new MetadataLogFileIndex(sparkSession, basePath, None)
- val fileCatalog = if (userSpecifiedSchema.nonEmpty) {
- val partitionSchema = combineInferredAndUserSpecifiedPartitionSchema(tempFileCatalog)
- new MetadataLogFileIndex(sparkSession, basePath, Option(partitionSchema))
- } else {
- tempFileCatalog
- }
+ val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sparkSession,
@@ -384,24 +354,23 @@ case class DataSource(
// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
- val allPaths = caseInsensitiveOptions.get("path") ++ paths
- val hadoopConf = sparkSession.sessionState.newHadoopConf()
- val globbedPaths = allPaths.flatMap(
- DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray
-
- val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
- val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)
-
- val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
- catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
+ val globbedPaths =
+ checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
+ val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
+ catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
+ catalogTable.get.partitionColumnNames.nonEmpty
+ val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
- new CatalogFileIndex(
+ val index = new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
+ (index, catalogTable.get.dataSchema, catalogTable.get.partitionSchema)
} else {
- new InMemoryFileIndex(
- sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache)
+ val index = createInMemoryFileIndex(globbedPaths)
+ val (resultDataSchema, resultPartitionSchema) =
+ getOrInferFileFormatSchema(format, Some(index))
+ (index, resultDataSchema, resultPartitionSchema)
}
HadoopFsRelation(
@@ -552,6 +521,40 @@ case class DataSource(
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}
+
+ /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */
+ private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
+ val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+ new InMemoryFileIndex(
+ sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
+ }
+
+ /**
+ * Checks and returns files in all the paths.
+ */
+ private def checkAndGlobPathIfNecessary(
+ checkEmptyGlobPath: Boolean,
+ checkFilesExist: Boolean): Seq[Path] = {
+ val allPaths = caseInsensitiveOptions.get("path") ++ paths
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ allPaths.flatMap { path =>
+ val hdfsPath = new Path(path)
+ val fs = hdfsPath.getFileSystem(hadoopConf)
+ val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+
+ if (checkEmptyGlobPath && globPath.isEmpty) {
+ throw new AnalysisException(s"Path does not exist: $qualified")
+ }
+
+ // Sufficient to check head of the globPath seq for non-glob scenario
+ // Don't need to check once again if files exist in streaming mode
+ if (checkFilesExist && !fs.exists(globPath.head)) {
+ throw new AnalysisException(s"Path does not exist: ${globPath.head}")
+ }
+ globPath
+ }.toSeq
+ }
}
object DataSource extends Logging {
@@ -700,30 +703,6 @@ object DataSource extends Logging {
}
/**
- * If `path` is a file pattern, return all the files that match it. Otherwise, return itself.
- * If `checkFilesExist` is `true`, also check the file existence.
- */
- private def checkAndGlobPathIfNecessary(
- hadoopConf: Configuration,
- path: String,
- checkFilesExist: Boolean): Seq[Path] = {
- val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(hadoopConf)
- val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
-
- if (globPath.isEmpty) {
- throw new AnalysisException(s"Path does not exist: $qualified")
- }
- // Sufficient to check head of the globPath seq for non-glob scenario
- // Don't need to check once again if files exist in streaming mode
- if (checkFilesExist && !fs.exists(globPath.head)) {
- throw new AnalysisException(s"Path does not exist: ${globPath.head}")
- }
- globPath
- }
-
- /**
* Called before writing into a FileFormat based data source to make sure the
* supplied schema is not empty.
* @param schema
http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 318ada0..739d1f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -41,17 +41,17 @@ import org.apache.spark.util.SerializableConfiguration
* @param rootPathsSpecified the list of root table paths to scan (some of which might be
* filtered out later)
* @param parameters as set of options to control discovery
- * @param partitionSchema an optional partition schema that will be use to provide types for the
- * discovered partitions
+ * @param userSpecifiedSchema an optional user specified schema that will be use to provide
+ * types for the discovered partitions
*/
class InMemoryFileIndex(
sparkSession: SparkSession,
rootPathsSpecified: Seq[Path],
parameters: Map[String, String],
- partitionSchema: Option[StructType],
+ userSpecifiedSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache)
extends PartitioningAwareFileIndex(
- sparkSession, parameters, partitionSchema, fileStatusCache) {
+ sparkSession, parameters, userSpecifiedSchema, fileStatusCache) {
// Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir)
// or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain
http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 6b6f638..cc8af7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -34,13 +34,13 @@ import org.apache.spark.sql.types.{StringType, StructType}
* It provides the necessary methods to parse partition data based on a set of files.
*
* @param parameters as set of options to control partition discovery
- * @param userPartitionSchema an optional partition schema that will be use to provide types for
- * the discovered partitions
+ * @param userSpecifiedSchema an optional user specified schema that will be use to provide
+ * types for the discovered partitions
*/
abstract class PartitioningAwareFileIndex(
sparkSession: SparkSession,
parameters: Map[String, String],
- userPartitionSchema: Option[StructType],
+ userSpecifiedSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging {
import PartitioningAwareFileIndex.BASE_PATH_PARAM
@@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex(
val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
-
- userPartitionSchema match {
+ val inferredPartitionSpec = PartitioningUtils.parsePartitions(
+ leafDirs,
+ typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
+ basePaths = basePaths,
+ timeZoneId = timeZoneId)
+ userSpecifiedSchema match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
- val spec = PartitioningUtils.parsePartitions(
- leafDirs,
- typeInference = false,
- basePaths = basePaths,
- timeZoneId = timeZoneId)
+ val userPartitionSchema =
+ combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec)
- // Without auto inference, all of value in the `row` should be null or in StringType,
// we need to cast into the data type that user specified.
def castPartitionValuesToUserSchema(row: InternalRow) = {
InternalRow((0 until row.numFields).map { i =>
+ val dt = inferredPartitionSpec.partitionColumns.fields(i).dataType
Cast(
- Literal.create(row.getUTF8String(i), StringType),
- userProvidedSchema.fields(i).dataType,
+ Literal.create(row.get(i, dt), dt),
+ userPartitionSchema.fields(i).dataType,
Option(timeZoneId)).eval()
}: _*)
}
- PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
+ PartitionSpec(userPartitionSchema, inferredPartitionSpec.partitions.map { part =>
part.copy(values = castPartitionValuesToUserSchema(part.values))
})
case _ =>
- PartitioningUtils.parsePartitions(
- leafDirs,
- typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
- basePaths = basePaths,
- timeZoneId = timeZoneId)
+ inferredPartitionSpec
}
}
@@ -236,6 +233,25 @@ abstract class PartitioningAwareFileIndex(
val name = path.getName
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
}
+
+ /**
+ * In the read path, only managed tables by Hive provide the partition columns properly when
+ * initializing this class. All other file based data sources will try to infer the partitioning,
+ * and then cast the inferred types to user specified dataTypes if the partition columns exist
+ * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or
+ * inconsistent data types as reported in SPARK-21463.
+ * @param spec A partition inference result
+ * @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema`
+ */
+ private def combineInferredAndUserSpecifiedPartitionSchema(spec: PartitionSpec): StructType = {
+ val equality = sparkSession.sessionState.conf.resolver
+ val resolved = spec.partitionColumns.map { partitionField =>
+ // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
+ userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
+ partitionField)
+ }
+ StructType(resolved)
+ }
}
object PartitioningAwareFileIndex {
http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
index 1da703c..5cacdd0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
@@ -30,14 +30,14 @@ import org.apache.spark.sql.types.StructType
* A [[FileIndex]] that generates the list of files to processing by reading them from the
* metadata log files generated by the [[FileStreamSink]].
*
- * @param userPartitionSchema an optional partition schema that will be use to provide types for
- * the discovered partitions
+ * @param userSpecifiedSchema an optional user specified schema that will be use to provide
+ * types for the discovered partitions
*/
class MetadataLogFileIndex(
sparkSession: SparkSession,
path: Path,
- userPartitionSchema: Option[StructType])
- extends PartitioningAwareFileIndex(sparkSession, Map.empty, userPartitionSchema) {
+ userSpecifiedSchema: Option[StructType])
+ extends PartitioningAwareFileIndex(sparkSession, Map.empty, userSpecifiedSchema) {
private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
logInfo(s"Reading streaming file log from $metadataDirectory")
@@ -51,7 +51,7 @@ class MetadataLogFileIndex(
}
override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
- allFilesFromLog.toArray.groupBy(_.getPath.getParent)
+ allFilesFromLog.groupBy(_.getPath.getParent)
}
override def rootPaths: Seq[Path] = path :: Nil
http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index c1d61b8..8764f0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -401,7 +401,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
sparkSession = spark,
rootPathsSpecified = Seq(new Path(tempDir)),
parameters = Map.empty[String, String],
- partitionSchema = None)
+ userSpecifiedSchema = None)
// This should not fail.
fileCatalog.listLeafFiles(Seq(new Path(tempDir)))
http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 1a86c60..3af163a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -419,7 +419,7 @@ class PartitionedTablePerfStatsSuite
HiveCatalogMetrics.reset()
spark.read.load(dir.getAbsolutePath)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
- assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org