You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/04/10 19:56:27 UTC
spark git commit: [SPARK-20280][CORE] FileStatusCache Weigher integer
overflow
Repository: spark
Updated Branches:
refs/heads/master a26e3ed5e -> f6dd8e0e1
[SPARK-20280][CORE] FileStatusCache Weigher integer overflow
## What changes were proposed in this pull request?
Weigher.weigh needs to return Int but it is possible for an Array[FileStatus] to have size > Int.maxValue. To avoid this, the size is scaled down by a factor of 32. The maximumWeight of the cache is also scaled down by the same factor.
## How was this patch tested?
New test in FileIndexSuite
Author: Bogdan Raducanu <bo...@databricks.com>
Closes #17591 from bogdanrdc/SPARK-20280.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6dd8e0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6dd8e0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6dd8e0e
Branch: refs/heads/master
Commit: f6dd8e0e1673aa491b895c1f0467655fa4e9d52f
Parents: a26e3ed
Author: Bogdan Raducanu <bo...@databricks.com>
Authored: Mon Apr 10 21:56:21 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Mon Apr 10 21:56:21 2017 +0200
----------------------------------------------------------------------
.../execution/datasources/FileStatusCache.scala | 47 ++++++++++++++------
.../execution/datasources/FileIndexSuite.scala | 16 +++++++
2 files changed, 50 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f6dd8e0e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index 5d97558..aea27bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -94,27 +94,48 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
// Opaque object that uniquely identifies a shared cache user
private type ClientId = Object
+
private val warnedAboutEviction = new AtomicBoolean(false)
// we use a composite cache key in order to distinguish entries inserted by different clients
- private val cache: Cache[(ClientId, Path), Array[FileStatus]] = CacheBuilder.newBuilder()
- .weigher(new Weigher[(ClientId, Path), Array[FileStatus]] {
+ private val cache: Cache[(ClientId, Path), Array[FileStatus]] = {
+ // [[Weigher]].weigh returns Int so we could only cache objects < 2GB
+ // instead, the weight is divided by this factor (which is smaller
+ // than the size of one [[FileStatus]]).
+ // so it will support objects up to 64GB in size.
+ val weightScale = 32
+ val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] {
override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = {
- (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt
- }})
- .removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
- override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]])
- : Unit = {
+ val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale
+ if (estimate > Int.MaxValue) {
+ logWarning(s"Cached table partition metadata size is too big. Approximating to " +
+ s"${Int.MaxValue.toLong * weightScale}.")
+ Int.MaxValue
+ } else {
+ estimate.toInt
+ }
+ }
+ }
+ val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
+ override def onRemoval(
+ removed: RemovalNotification[(ClientId, Path),
+ Array[FileStatus]]): Unit = {
if (removed.getCause == RemovalCause.SIZE &&
- warnedAboutEviction.compareAndSet(false, true)) {
+ warnedAboutEviction.compareAndSet(false, true)) {
logWarning(
"Evicting cached table partition metadata from memory due to size constraints " +
- "(spark.sql.hive.filesourcePartitionFileCacheSize = " + maxSizeInBytes + " bytes). " +
- "This may impact query planning performance.")
+ "(spark.sql.hive.filesourcePartitionFileCacheSize = "
+ + maxSizeInBytes + " bytes). This may impact query planning performance.")
}
- }})
- .maximumWeight(maxSizeInBytes)
- .build[(ClientId, Path), Array[FileStatus]]()
+ }
+ }
+ CacheBuilder.newBuilder()
+ .weigher(weigher)
+ .removalListener(removalListener)
+ .maximumWeight(maxSizeInBytes / weightScale)
+ .build[(ClientId, Path), Array[FileStatus]]()
+ }
+
/**
* @return a FileStatusCache that does not share any entries with any other client, but does
http://git-wip-us.apache.org/repos/asf/spark/blob/f6dd8e0e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 00f5d5d..a9511cb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator}
class FileIndexSuite extends SharedSQLContext {
@@ -220,6 +221,21 @@ class FileIndexSuite extends SharedSQLContext {
assert(catalog.leafDirPaths.head == fs.makeQualified(dirPath))
}
}
+
+ test("SPARK-20280 - FileStatusCache with a partition with very many files") {
+ /* fake the size, otherwise we need to allocate 2GB of data to trigger this bug */
+ class MyFileStatus extends FileStatus with KnownSizeEstimation {
+ override def estimatedSize: Long = 1000 * 1000 * 1000
+ }
+ /* files * MyFileStatus.estimatedSize should overflow to negative integer
+ * so, make it between 2bn and 4bn
+ */
+ val files = (1 to 3).map { i =>
+ new MyFileStatus()
+ }
+ val fileStatusCache = FileStatusCache.getOrCreate(spark)
+ fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray)
+ }
}
class FakeParentPathFileSystem extends RawLocalFileSystem {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org