You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2017/08/23 16:51:21 UTC

spark git commit: [SPARK-21501] Change CacheLoader to limit entries based on memory footprint

Repository: spark
Updated Branches:
  refs/heads/master d6b30edd4 -> 1662e9311


[SPARK-21501] Change CacheLoader to limit entries based on memory footprint

Right now the spark shuffle service has a cache for index files. It is based on a # of files cached (spark.shuffle.service.index.cache.entries). This can cause issues if people have a lot of reducers because the size of each entry can fluctuate based on the # of reducers.
We saw an issues with a job that had 170000 reducers and it caused NM with spark shuffle service to use 700-800MB or memory in NM by itself.
We should change this cache to be memory based and only allow a certain memory size used. When I say memory based I mean the cache should have a limit of say 100MB.

https://issues.apache.org/jira/browse/SPARK-21501

Manual Testing with 170000 reducers has been performed with cache loaded up to max 100MB default limit, with each shuffle index file of size 1.3MB. Eviction takes place as soon as the total cache size reaches the 100MB limit and the objects will be ready for garbage collection there by avoiding NM to crash. No notable difference in runtime has been observed.

Author: Sanket Chintapalli <sc...@yahoo-inc.com>

Closes #18940 from redsanket/SPARK-21501.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1662e931
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1662e931
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1662e931

Branch: refs/heads/master
Commit: 1662e93119d68498942386906de309d35f4a135f
Parents: d6b30ed
Author: Sanket Chintapalli <sc...@yahoo-inc.com>
Authored: Wed Aug 23 11:51:11 2017 -0500
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Wed Aug 23 11:51:11 2017 -0500

----------------------------------------------------------------------
 .../org/apache/spark/network/util/TransportConf.java     |  4 ++++
 .../network/shuffle/ExternalShuffleBlockResolver.java    | 11 +++++++++--
 .../spark/network/shuffle/ShuffleIndexInformation.java   | 11 ++++++++++-
 core/src/main/scala/org/apache/spark/SparkConf.scala     |  4 +++-
 docs/configuration.md                                    |  6 +++---
 5 files changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1662e931/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 88256b8..fa2ff42 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -67,6 +67,10 @@ public class TransportConf {
     return conf.getInt(name, defaultValue);
   }
 
+  public String get(String name, String defaultValue) {
+    return conf.get(name, defaultValue);
+  }
+
   private String getConfKey(String suffix) {
     return "spark." + module + "." + suffix;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1662e931/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index d7ec0e2..e639989 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -33,6 +33,7 @@ import com.google.common.base.Objects;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.cache.Weigher;
 import com.google.common.collect.Maps;
 import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBIterator;
@@ -104,7 +105,7 @@ public class ExternalShuffleBlockResolver {
       Executor directoryCleaner) throws IOException {
     this.conf = conf;
     this.registeredExecutorFile = registeredExecutorFile;
-    int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024);
+    String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
     CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
         new CacheLoader<File, ShuffleIndexInformation>() {
           public ShuffleIndexInformation load(File file) throws IOException {
@@ -112,7 +113,13 @@ public class ExternalShuffleBlockResolver {
           }
         };
     shuffleIndexCache = CacheBuilder.newBuilder()
-                                    .maximumSize(indexCacheEntries).build(indexCacheLoader);
+      .maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
+      .weigher(new Weigher<File, ShuffleIndexInformation>() {
+        public int weigh(File file, ShuffleIndexInformation indexInfo) {
+          return indexInfo.getSize();
+        }
+      })
+      .build(indexCacheLoader);
     db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
     if (db != null) {
       executors = reloadRegisteredExecutors(db);

http://git-wip-us.apache.org/repos/asf/spark/blob/1662e931/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
index 39ca9ba..386738e 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
@@ -31,9 +31,10 @@ import java.nio.file.Files;
 public class ShuffleIndexInformation {
   /** offsets as long buffer */
   private final LongBuffer offsets;
+  private int size;
 
   public ShuffleIndexInformation(File indexFile) throws IOException {
-    int size = (int)indexFile.length();
+    size = (int)indexFile.length();
     ByteBuffer buffer = ByteBuffer.allocate(size);
     offsets = buffer.asLongBuffer();
     DataInputStream dis = null;
@@ -48,6 +49,14 @@ public class ShuffleIndexInformation {
   }
 
   /**
+   * Size of the index file
+   * @return size
+   */
+  public int getSize() {
+    return size;
+  }
+
+  /**
    * Get index offset for a particular reducer.
    */
   public ShuffleIndexRecord getIndex(int reduceId) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1662e931/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 715cfdc..e61f943 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -597,7 +597,9 @@ private[spark] object SparkConf extends Logging {
       DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
         "Please use the new blacklisting options, spark.blacklist.*"),
       DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"),
-      DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more")
+      DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more"),
+      DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0",
+        "Not used any more. Please use spark.shuffle.service.index.cache.size")
     )
 
     Map(configs.map { cfg => (cfg.key -> cfg) } : _*)

http://git-wip-us.apache.org/repos/asf/spark/blob/1662e931/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index e7c0306..6e9fe59 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -627,10 +627,10 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.shuffle.service.index.cache.entries</code></td>
-  <td>1024</td>
+  <td><code>spark.shuffle.service.index.cache.size</code></td>
+  <td>100m</td>
   <td>
-    Max number of entries to keep in the index cache of the shuffle service.
+    Cache entries limited to the specified memory footprint.
   </td>
 </tr>
 <tr>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org