You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/04/10 19:56:27 UTC

spark git commit: [SPARK-20280][CORE] FileStatusCache Weigher integer overflow

Repository: spark
Updated Branches:
  refs/heads/master a26e3ed5e -> f6dd8e0e1


[SPARK-20280][CORE] FileStatusCache Weigher integer overflow

## What changes were proposed in this pull request?

Weigher.weigh needs to return Int but it is possible for an Array[FileStatus] to have size > Int.maxValue. To avoid this, the size is scaled down by a factor of 32. The maximumWeight of the cache is also scaled down by the same factor.

## How was this patch tested?
New test in FileIndexSuite

Author: Bogdan Raducanu <bo...@databricks.com>

Closes #17591 from bogdanrdc/SPARK-20280.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6dd8e0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6dd8e0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6dd8e0e

Branch: refs/heads/master
Commit: f6dd8e0e1673aa491b895c1f0467655fa4e9d52f
Parents: a26e3ed
Author: Bogdan Raducanu <bo...@databricks.com>
Authored: Mon Apr 10 21:56:21 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Mon Apr 10 21:56:21 2017 +0200

----------------------------------------------------------------------
 .../execution/datasources/FileStatusCache.scala | 47 ++++++++++++++------
 .../execution/datasources/FileIndexSuite.scala  | 16 +++++++
 2 files changed, 50 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6dd8e0e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index 5d97558..aea27bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -94,27 +94,48 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
   // Opaque object that uniquely identifies a shared cache user
   private type ClientId = Object
 
+
   private val warnedAboutEviction = new AtomicBoolean(false)
 
   // we use a composite cache key in order to distinguish entries inserted by different clients
-  private val cache: Cache[(ClientId, Path), Array[FileStatus]] = CacheBuilder.newBuilder()
-    .weigher(new Weigher[(ClientId, Path), Array[FileStatus]] {
+  private val cache: Cache[(ClientId, Path), Array[FileStatus]] = {
+    // [[Weigher]].weigh returns Int so we could only cache objects < 2GB
+    // instead, the weight is divided by this factor (which is smaller
+    // than the size of one [[FileStatus]]).
+    // so it will support objects up to 64GB in size.
+    val weightScale = 32
+    val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] {
       override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = {
-        (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt
-      }})
-    .removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
-      override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]])
-        : Unit = {
+        val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale
+        if (estimate > Int.MaxValue) {
+          logWarning(s"Cached table partition metadata size is too big. Approximating to " +
+            s"${Int.MaxValue.toLong * weightScale}.")
+          Int.MaxValue
+        } else {
+          estimate.toInt
+        }
+      }
+    }
+    val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
+      override def onRemoval(
+          removed: RemovalNotification[(ClientId, Path),
+          Array[FileStatus]]): Unit = {
         if (removed.getCause == RemovalCause.SIZE &&
-            warnedAboutEviction.compareAndSet(false, true)) {
+          warnedAboutEviction.compareAndSet(false, true)) {
           logWarning(
             "Evicting cached table partition metadata from memory due to size constraints " +
-            "(spark.sql.hive.filesourcePartitionFileCacheSize = " + maxSizeInBytes + " bytes). " +
-            "This may impact query planning performance.")
+              "(spark.sql.hive.filesourcePartitionFileCacheSize = "
+              + maxSizeInBytes + " bytes). This may impact query planning performance.")
         }
-      }})
-    .maximumWeight(maxSizeInBytes)
-    .build[(ClientId, Path), Array[FileStatus]]()
+      }
+    }
+    CacheBuilder.newBuilder()
+      .weigher(weigher)
+      .removalListener(removalListener)
+      .maximumWeight(maxSizeInBytes / weightScale)
+      .build[(ClientId, Path), Array[FileStatus]]()
+  }
+
 
   /**
    * @return a FileStatusCache that does not share any entries with any other client, but does

http://git-wip-us.apache.org/repos/asf/spark/blob/f6dd8e0e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 00f5d5d..a9511cb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator}
 
 class FileIndexSuite extends SharedSQLContext {
 
@@ -220,6 +221,21 @@ class FileIndexSuite extends SharedSQLContext {
       assert(catalog.leafDirPaths.head == fs.makeQualified(dirPath))
     }
   }
+
+  test("SPARK-20280 - FileStatusCache with a partition with very many files") {
+    /* fake the size, otherwise we need to allocate 2GB of data to trigger this bug */
+    class MyFileStatus extends FileStatus with KnownSizeEstimation {
+      override def estimatedSize: Long = 1000 * 1000 * 1000
+    }
+    /* files * MyFileStatus.estimatedSize should overflow to negative integer
+     * so, make it between 2bn and 4bn
+     */
+    val files = (1 to 3).map { i =>
+      new MyFileStatus()
+    }
+    val fileStatusCache = FileStatusCache.getOrCreate(spark)
+    fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray)
+  }
 }
 
 class FakeParentPathFileSystem extends RawLocalFileSystem {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org