You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/17 14:54:41 UTC

spark git commit: [SPARK-22062][CORE] Spill large block to disk in BlockManager's remote fetch to avoid OOM

Repository: spark
Updated Branches:
  refs/heads/master 99e32f8ba -> e1960c3d6


[SPARK-22062][CORE] Spill large block to disk in BlockManager's remote fetch to avoid OOM

## What changes were proposed in this pull request?

In the current BlockManager's `getRemoteBytes`, it will call `BlockTransferService#fetchBlockSync` to get remote block. In the `fetchBlockSync`, Spark will allocate a temporary `ByteBuffer` to store the whole fetched block. This will potentially lead to OOM if block size is too big or several blocks are fetched simultaneously in this executor.

So here leveraging the idea of shuffle fetch, to spill the large block to local disk before consumed by upstream code. The behavior is controlled by newly added configuration, if block size is smaller than the threshold, then this block will be persisted in memory; otherwise it will first spill to disk, and then read from disk file.

To achieve this feature, what I did is:

1. Rename `TempShuffleFileManager` to `TempFileManager`, since now it is not only used by shuffle.
2. Add a new `TempFileManager` to manage the files of fetched remote blocks, the files are tracked by weak reference, will be deleted when no use at all.

## How was this patch tested?

This was tested by adding UT, also manual verification in local test to perform GC to clean the files.

Author: jerryshao <ss...@hortonworks.com>

Closes #19476 from jerryshao/SPARK-22062.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1960c3d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1960c3d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1960c3d

Branch: refs/heads/master
Commit: e1960c3d6f380b0dfbba6ee5d8ac6da4bc29a698
Parents: 99e32f8
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue Oct 17 22:54:38 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Oct 17 22:54:38 2017 +0800

----------------------------------------------------------------------
 .../network/shuffle/ExternalShuffleClient.java  |   4 +-
 .../network/shuffle/OneForOneBlockFetcher.java  |  12 +--
 .../spark/network/shuffle/ShuffleClient.java    |  10 +-
 .../spark/network/shuffle/TempFileManager.java  |  36 +++++++
 .../network/shuffle/TempShuffleFileManager.java |  36 -------
 .../main/scala/org/apache/spark/SparkConf.scala |   4 +-
 .../apache/spark/internal/config/package.scala  |  15 +--
 .../spark/network/BlockTransferService.scala    |  28 +++--
 .../netty/NettyBlockTransferService.scala       |   6 +-
 .../spark/shuffle/BlockStoreShuffleReader.scala |   2 +-
 .../org/apache/spark/storage/BlockManager.scala | 102 +++++++++++++++++--
 .../spark/storage/BlockManagerMaster.scala      |   6 ++
 .../storage/BlockManagerMasterEndpoint.scala    |  14 +++
 .../spark/storage/BlockManagerMessages.scala    |   7 ++
 .../storage/ShuffleBlockFetcherIterator.scala   |   8 +-
 .../org/apache/spark/DistributedSuite.scala     |   2 +-
 .../spark/storage/BlockManagerSuite.scala       |  57 ++++++++---
 .../ShuffleBlockFetcherIteratorSuite.scala      |  10 +-
 docs/configuration.md                           |  11 +-
 19 files changed, 266 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 7770244..510017f 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -91,7 +91,7 @@ public class ExternalShuffleClient extends ShuffleClient {
       String execId,
       String[] blockIds,
       BlockFetchingListener listener,
-      TempShuffleFileManager tempShuffleFileManager) {
+      TempFileManager tempFileManager) {
     checkInit();
     logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
     try {
@@ -99,7 +99,7 @@ public class ExternalShuffleClient extends ShuffleClient {
           (blockIds1, listener1) -> {
             TransportClient client = clientFactory.createClient(host, port);
             new OneForOneBlockFetcher(client, appId, execId,
-              blockIds1, listener1, conf, tempShuffleFileManager).start();
+              blockIds1, listener1, conf, tempFileManager).start();
           };
 
       int maxRetries = conf.maxIORetries();

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index 66b67e2..3f2f20b 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -58,7 +58,7 @@ public class OneForOneBlockFetcher {
   private final BlockFetchingListener listener;
   private final ChunkReceivedCallback chunkCallback;
   private final TransportConf transportConf;
-  private final TempShuffleFileManager tempShuffleFileManager;
+  private final TempFileManager tempFileManager;
 
   private StreamHandle streamHandle = null;
 
@@ -79,14 +79,14 @@ public class OneForOneBlockFetcher {
       String[] blockIds,
       BlockFetchingListener listener,
       TransportConf transportConf,
-      TempShuffleFileManager tempShuffleFileManager) {
+      TempFileManager tempFileManager) {
     this.client = client;
     this.openMessage = new OpenBlocks(appId, execId, blockIds);
     this.blockIds = blockIds;
     this.listener = listener;
     this.chunkCallback = new ChunkCallback();
     this.transportConf = transportConf;
-    this.tempShuffleFileManager = tempShuffleFileManager;
+    this.tempFileManager = tempFileManager;
   }
 
   /** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */
@@ -125,7 +125,7 @@ public class OneForOneBlockFetcher {
           // Immediately request all chunks -- we expect that the total size of the request is
           // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
           for (int i = 0; i < streamHandle.numChunks; i++) {
-            if (tempShuffleFileManager != null) {
+            if (tempFileManager != null) {
               client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),
                 new DownloadCallback(i));
             } else {
@@ -164,7 +164,7 @@ public class OneForOneBlockFetcher {
     private int chunkIndex;
 
     DownloadCallback(int chunkIndex) throws IOException {
-      this.targetFile = tempShuffleFileManager.createTempShuffleFile();
+      this.targetFile = tempFileManager.createTempFile();
       this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath()));
       this.chunkIndex = chunkIndex;
     }
@@ -180,7 +180,7 @@ public class OneForOneBlockFetcher {
       ManagedBuffer buffer = new FileSegmentManagedBuffer(transportConf, targetFile, 0,
         targetFile.length());
       listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
-      if (!tempShuffleFileManager.registerTempShuffleFileToClean(targetFile)) {
+      if (!tempFileManager.registerTempFileToClean(targetFile)) {
         targetFile.delete();
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
index 5bd4412..18b04fe 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
@@ -43,10 +43,10 @@ public abstract class ShuffleClient implements Closeable {
    * @param execId the executor id.
    * @param blockIds block ids to fetch.
    * @param listener the listener to receive block fetching status.
-   * @param tempShuffleFileManager TempShuffleFileManager to create and clean temp shuffle files.
-   *                               If it's not <code>null</code>, the remote blocks will be streamed
-   *                               into temp shuffle files to reduce the memory usage, otherwise,
-   *                               they will be kept in memory.
+   * @param tempFileManager TempFileManager to create and clean temp files.
+   *                        If it's not <code>null</code>, the remote blocks will be streamed
+   *                        into temp shuffle files to reduce the memory usage, otherwise,
+   *                        they will be kept in memory.
    */
   public abstract void fetchBlocks(
       String host,
@@ -54,7 +54,7 @@ public abstract class ShuffleClient implements Closeable {
       String execId,
       String[] blockIds,
       BlockFetchingListener listener,
-      TempShuffleFileManager tempShuffleFileManager);
+      TempFileManager tempFileManager);
 
   /**
    * Get the shuffle MetricsSet from ShuffleClient, this will be used in MetricsSystem to

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java
new file mode 100644
index 0000000..552364d
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+
+/**
+ * A manager to create temp block files to reduce the memory usage and also clean temp
+ * files when they won't be used any more.
+ */
+public interface TempFileManager {
+
+  /** Create a temp block file. */
+  File createTempFile();
+
+  /**
+   * Register a temp file to clean up when it won't be used any more. Return whether the
+   * file is registered successfully. If `false`, the caller should clean up the file by itself.
+   */
+  boolean registerTempFileToClean(File file);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java
deleted file mode 100644
index 84a5ed6..0000000
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.File;
-
-/**
- * A manager to create temp shuffle block files to reduce the memory usage and also clean temp
- * files when they won't be used any more.
- */
-public interface TempShuffleFileManager {
-
-  /** Create a temp shuffle block file. */
-  File createTempShuffleFile();
-
-  /**
-   * Register a temp shuffle file to clean up when it won't be used any more. Return whether the
-   * file is registered successfully. If `false`, the caller should clean up the file by itself.
-   */
-  boolean registerTempShuffleFileToClean(File file);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index e61f943..57b3744 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -662,7 +662,9 @@ private[spark] object SparkConf extends Logging {
     "spark.yarn.jars" -> Seq(
       AlternateConfig("spark.yarn.jar", "2.0")),
     "spark.yarn.access.hadoopFileSystems" -> Seq(
-      AlternateConfig("spark.yarn.access.namenodes", "2.2"))
+      AlternateConfig("spark.yarn.access.namenodes", "2.2")),
+    "spark.maxRemoteBlockSizeFetchToMem" -> Seq(
+      AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3"))
   )
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index efffdca..e7b406a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -357,13 +357,15 @@ package object config {
       .checkValue(_ > 0, "The max no. of blocks in flight cannot be non-positive.")
       .createWithDefault(Int.MaxValue)
 
-  private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM =
-    ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem")
-      .doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +
+  private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
+    ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
+      .doc("Remote block will be fetched to disk when size of the block is " +
         "above this threshold. This is to avoid a giant request takes too much memory. We can " +
-        "enable this config by setting a specific value(e.g. 200m). Note that this config can " +
-        "be enabled only when the shuffle shuffle service is newer than Spark-2.2 or the shuffle" +
-        " service is disabled.")
+        "enable this config by setting a specific value(e.g. 200m). Note this configuration will " +
+        "affect both shuffle fetch and block manager remote block fetch. For users who " +
+        "enabled external shuffle service, this feature can only be worked when external shuffle" +
+        " service is newer than Spark 2.2.")
+      .withAlternative("spark.reducer.maxReqSizeShuffleToMem")
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(Long.MaxValue)
 
@@ -432,5 +434,4 @@ package object config {
     .stringConf
     .toSequence
     .createOptional
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index fe5fd2d..1d8a266 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -25,8 +25,8 @@ import scala.concurrent.duration.Duration
 import scala.reflect.ClassTag
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempShuffleFileManager}
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager}
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.util.ThreadUtils
 
@@ -68,7 +68,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
       execId: String,
       blockIds: Array[String],
       listener: BlockFetchingListener,
-      tempShuffleFileManager: TempShuffleFileManager): Unit
+      tempFileManager: TempFileManager): Unit
 
   /**
    * Upload a single block to a remote node, available only after [[init]] is invoked.
@@ -87,7 +87,12 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
    *
    * It is also only available after [[init]] is invoked.
    */
-  def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = {
+  def fetchBlockSync(
+      host: String,
+      port: Int,
+      execId: String,
+      blockId: String,
+      tempFileManager: TempFileManager): ManagedBuffer = {
     // A monitor for the thread to wait on.
     val result = Promise[ManagedBuffer]()
     fetchBlocks(host, port, execId, Array(blockId),
@@ -96,12 +101,17 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
           result.failure(exception)
         }
         override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
-          val ret = ByteBuffer.allocate(data.size.toInt)
-          ret.put(data.nioByteBuffer())
-          ret.flip()
-          result.success(new NioManagedBuffer(ret))
+          data match {
+            case f: FileSegmentManagedBuffer =>
+              result.success(f)
+            case _ =>
+              val ret = ByteBuffer.allocate(data.size.toInt)
+              ret.put(data.nioByteBuffer())
+              ret.flip()
+              result.success(new NioManagedBuffer(ret))
+          }
         }
-      }, tempShuffleFileManager = null)
+      }, tempFileManager)
     ThreadUtils.awaitResult(result.future, Duration.Inf)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 6a29e18..b7d8c35 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -32,7 +32,7 @@ import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory}
 import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap}
 import org.apache.spark.network.server._
-import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher, TempShuffleFileManager}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher, TempFileManager}
 import org.apache.spark.network.shuffle.protocol.UploadBlock
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.JavaSerializer
@@ -105,14 +105,14 @@ private[spark] class NettyBlockTransferService(
       execId: String,
       blockIds: Array[String],
       listener: BlockFetchingListener,
-      tempShuffleFileManager: TempShuffleFileManager): Unit = {
+      tempFileManager: TempFileManager): Unit = {
     logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
     try {
       val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
         override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
           val client = clientFactory.createClient(host, port)
           new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
-            transportConf, tempShuffleFileManager).start()
+            transportConf, tempFileManager).start()
         }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index c8d1460..0562d45 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -52,7 +52,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
       SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
       SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
       SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
-      SparkEnv.get.conf.get(config.REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM),
+      SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
       SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
 
     val serializerInstance = dep.serializer.newInstance()

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a98083d..e0276a4 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -18,8 +18,11 @@
 package org.apache.spark.storage
 
 import java.io._
+import java.lang.ref.{ReferenceQueue => JReferenceQueue, WeakReference}
 import java.nio.ByteBuffer
 import java.nio.channels.Channels
+import java.util.Collections
+import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable
 import scala.collection.mutable.HashMap
@@ -39,7 +42,7 @@ import org.apache.spark.metrics.source.Source
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.network.shuffle.ExternalShuffleClient
+import org.apache.spark.network.shuffle.{ExternalShuffleClient, TempFileManager}
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
@@ -203,6 +206,13 @@ private[spark] class BlockManager(
 
   private var blockReplicationPolicy: BlockReplicationPolicy = _
 
+  // A TempFileManager used to track all the files of remote blocks which above the
+  // specified memory threshold. Files will be deleted automatically based on weak reference.
+  // Exposed for test
+  private[storage] val remoteBlockTempFileManager =
+    new BlockManager.RemoteBlockTempFileManager(this)
+  private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
+
   /**
    * Initializes the BlockManager with the given appId. This is not performed in the constructor as
    * the appId may not be known at BlockManager instantiation time (in particular for the driver,
@@ -632,8 +642,8 @@ private[spark] class BlockManager(
    * Return a list of locations for the given block, prioritizing the local machine since
    * multiple block managers can share the same host, followed by hosts on the same rack.
    */
-  private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
-    val locs = Random.shuffle(master.getLocations(blockId))
+  private def sortLocations(locations: Seq[BlockManagerId]): Seq[BlockManagerId] = {
+    val locs = Random.shuffle(locations)
     val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
     blockManagerId.topologyInfo match {
       case None => preferredLocs ++ otherLocs
@@ -653,7 +663,25 @@ private[spark] class BlockManager(
     require(blockId != null, "BlockId is null")
     var runningFailureCount = 0
     var totalFailureCount = 0
-    val locations = getLocations(blockId)
+
+    // Because all the remote blocks are registered in driver, it is not necessary to ask
+    // all the slave executors to get block status.
+    val locationsAndStatus = master.getLocationsAndStatus(blockId)
+    val blockSize = locationsAndStatus.map { b =>
+      b.status.diskSize.max(b.status.memSize)
+    }.getOrElse(0L)
+    val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)
+
+    // If the block size is above the threshold, we should pass our FileManger to
+    // BlockTransferService, which will leverage it to spill the block; if not, then passed-in
+    // null value means the block will be persisted in memory.
+    val tempFileManager = if (blockSize > maxRemoteBlockToMem) {
+      remoteBlockTempFileManager
+    } else {
+      null
+    }
+
+    val locations = sortLocations(blockLocations)
     val maxFetchFailures = locations.size
     var locationIterator = locations.iterator
     while (locationIterator.hasNext) {
@@ -661,7 +689,7 @@ private[spark] class BlockManager(
       logDebug(s"Getting remote block $blockId from $loc")
       val data = try {
         blockTransferService.fetchBlockSync(
-          loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
+          loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager).nioByteBuffer()
       } catch {
         case NonFatal(e) =>
           runningFailureCount += 1
@@ -684,7 +712,7 @@ private[spark] class BlockManager(
           // take a significant amount of time. To get rid of these stale entries
           // we refresh the block locations after a certain number of fetch failures
           if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
-            locationIterator = getLocations(blockId).iterator
+            locationIterator = sortLocations(master.getLocations(blockId)).iterator
             logDebug(s"Refreshed locations from the driver " +
               s"after ${runningFailureCount} fetch failures.")
             runningFailureCount = 0
@@ -1512,6 +1540,7 @@ private[spark] class BlockManager(
       // Closing should be idempotent, but maybe not for the NioBlockTransferService.
       shuffleClient.close()
     }
+    remoteBlockTempFileManager.stop()
     diskBlockManager.stop()
     rpcEnv.stop(slaveEndpoint)
     blockInfoManager.clear()
@@ -1552,4 +1581,65 @@ private[spark] object BlockManager {
     override val metricRegistry = new MetricRegistry
     metricRegistry.registerAll(metricSet)
   }
+
+  class RemoteBlockTempFileManager(blockManager: BlockManager)
+      extends TempFileManager with Logging {
+
+    private class ReferenceWithCleanup(file: File, referenceQueue: JReferenceQueue[File])
+        extends WeakReference[File](file, referenceQueue) {
+      private val filePath = file.getAbsolutePath
+
+      def cleanUp(): Unit = {
+        logDebug(s"Clean up file $filePath")
+
+        if (!new File(filePath).delete()) {
+          logDebug(s"Fail to delete file $filePath")
+        }
+      }
+    }
+
+    private val referenceQueue = new JReferenceQueue[File]
+    private val referenceBuffer = Collections.newSetFromMap[ReferenceWithCleanup](
+      new ConcurrentHashMap)
+
+    private val POLL_TIMEOUT = 1000
+    @volatile private var stopped = false
+
+    private val cleaningThread = new Thread() { override def run() { keepCleaning() } }
+    cleaningThread.setDaemon(true)
+    cleaningThread.setName("RemoteBlock-temp-file-clean-thread")
+    cleaningThread.start()
+
+    override def createTempFile(): File = {
+      blockManager.diskBlockManager.createTempLocalBlock()._2
+    }
+
+    override def registerTempFileToClean(file: File): Boolean = {
+      referenceBuffer.add(new ReferenceWithCleanup(file, referenceQueue))
+    }
+
+    def stop(): Unit = {
+      stopped = true
+      cleaningThread.interrupt()
+      cleaningThread.join()
+    }
+
+    private def keepCleaning(): Unit = {
+      while (!stopped) {
+        try {
+          Option(referenceQueue.remove(POLL_TIMEOUT))
+            .map(_.asInstanceOf[ReferenceWithCleanup])
+            .foreach { ref =>
+              referenceBuffer.remove(ref)
+              ref.cleanUp()
+            }
+        } catch {
+          case _: InterruptedException =>
+            // no-op
+          case NonFatal(e) =>
+            logError("Error in cleaning thread", e)
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 8b1dc0b..d24421b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -84,6 +84,12 @@ class BlockManagerMaster(
     driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))
   }
 
+  /** Get locations as well as status of the blockId from the driver */
+  def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = {
+    driverEndpoint.askSync[Option[BlockLocationsAndStatus]](
+      GetLocationsAndStatus(blockId))
+  }
+
   /** Get locations of multiple blockIds from the driver */
   def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
     driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index df0a5f5..56d0266 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -82,6 +82,9 @@ class BlockManagerMasterEndpoint(
     case GetLocations(blockId) =>
       context.reply(getLocations(blockId))
 
+    case GetLocationsAndStatus(blockId) =>
+      context.reply(getLocationsAndStatus(blockId))
+
     case GetLocationsMultipleBlockIds(blockIds) =>
       context.reply(getLocationsMultipleBlockIds(blockIds))
 
@@ -422,6 +425,17 @@ class BlockManagerMasterEndpoint(
     if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
   }
 
+  private def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = {
+    val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty)
+    val status = locations.headOption.flatMap { bmId => blockManagerInfo(bmId).getStatus(blockId) }
+
+    if (locations.nonEmpty && status.isDefined) {
+      Some(BlockLocationsAndStatus(locations, status.get))
+    } else {
+      None
+    }
+  }
+
   private def getLocationsMultipleBlockIds(
       blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
     blockIds.map(blockId => getLocations(blockId))

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 0c0ff14..1bbe7a5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -93,6 +93,13 @@ private[spark] object BlockManagerMessages {
 
   case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster
 
+  case class GetLocationsAndStatus(blockId: BlockId) extends ToBlockManagerMaster
+
+  // The response message of `GetLocationsAndStatus` request.
+  case class BlockLocationsAndStatus(locations: Seq[BlockManagerId], status: BlockStatus) {
+    assert(locations.nonEmpty)
+  }
+
   case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
 
   case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 2d176b6..98b5a73 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
 import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempShuffleFileManager}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.Utils
 import org.apache.spark.util.io.ChunkedByteBufferOutputStream
@@ -69,7 +69,7 @@ final class ShuffleBlockFetcherIterator(
     maxBlocksInFlightPerAddress: Int,
     maxReqSizeShuffleToMem: Long,
     detectCorrupt: Boolean)
-  extends Iterator[(BlockId, InputStream)] with TempShuffleFileManager with Logging {
+  extends Iterator[(BlockId, InputStream)] with TempFileManager with Logging {
 
   import ShuffleBlockFetcherIterator._
 
@@ -162,11 +162,11 @@ final class ShuffleBlockFetcherIterator(
     currentResult = null
   }
 
-  override def createTempShuffleFile(): File = {
+  override def createTempFile(): File = {
     blockManager.diskBlockManager.createTempLocalBlock()._2
   }
 
-  override def registerTempShuffleFileToClean(file: File): Boolean = synchronized {
+  override def registerTempFileToClean(file: File): Boolean = synchronized {
     if (isZombie) {
       false
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index bea67b7..f800561 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -171,7 +171,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
     val serializerManager = SparkEnv.get.serializerManager
     blockManager.master.getLocations(blockId).foreach { cmId =>
       val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,
-        blockId.toString)
+        blockId.toString, null)
       val deserialized = serializerManager.dataDeserializeStream(blockId,
         new ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream())(data.elementClassTag).toList
       assert(deserialized === (1 to 100).toList)

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index cfe89fd..d45c194 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.storage
 
-import java.io.File
 import java.nio.ByteBuffer
 
 import scala.collection.JavaConverters._
@@ -45,14 +44,14 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
 import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf}
 import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempShuffleFileManager}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager}
 import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
 import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
+import org.apache.spark.storage.BlockManagerMessages._
 import org.apache.spark.util._
 import org.apache.spark.util.io.ChunkedByteBuffer
 
@@ -512,8 +511,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     when(bmMaster.getLocations(mc.any[BlockId])).thenReturn(Seq(bmId1, bmId2, bmId3))
 
     val blockManager = makeBlockManager(128, "exec", bmMaster)
-    val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
-    val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
+    val sortLocations = PrivateMethod[Seq[BlockManagerId]]('sortLocations)
+    val locations = blockManager invokePrivate sortLocations(bmMaster.getLocations("test"))
     assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
   }
 
@@ -535,8 +534,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val blockManager = makeBlockManager(128, "exec", bmMaster)
     blockManager.blockManagerId =
       BlockManagerId(SparkContext.DRIVER_IDENTIFIER, localHost, 1, Some(localRack))
-    val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
-    val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
+    val sortLocations = PrivateMethod[Seq[BlockManagerId]]('sortLocations)
+    val locations = blockManager invokePrivate sortLocations(bmMaster.getLocations("test"))
     assert(locations.map(_.host) === Seq(localHost, localHost, otherHost, otherHost, otherHost))
     assert(locations.flatMap(_.topologyInfo)
       === Seq(localRack, localRack, localRack, otherRack, otherRack))
@@ -1274,13 +1273,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // so that we have a chance to do location refresh
     val blockManagerIds = (0 to maxFailuresBeforeLocationRefresh)
       .map { i => BlockManagerId(s"id-$i", s"host-$i", i + 1) }
-    when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockManagerIds)
+    when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId])).thenReturn(
+      Option(BlockLocationsAndStatus(blockManagerIds, BlockStatus.empty)))
+    when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(
+      blockManagerIds)
+
     store = makeBlockManager(8000, "executor1", mockBlockManagerMaster,
       transferService = Option(mockBlockTransferService))
     val block = store.getRemoteBytes("item")
       .asInstanceOf[Option[ByteBuffer]]
     assert(block.isDefined)
-    verify(mockBlockManagerMaster, times(2)).getLocations("item")
+    verify(mockBlockManagerMaster, times(1)).getLocationsAndStatus("item")
+    verify(mockBlockManagerMaster, times(1)).getLocations("item")
   }
 
   test("SPARK-17484: block status is properly updated following an exception in put()") {
@@ -1371,8 +1375,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     server.close()
   }
 
+  test("fetch remote block to local disk if block size is larger than threshold") {
+    conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 1000L)
+
+    val mockBlockManagerMaster = mock(classOf[BlockManagerMaster])
+    val mockBlockTransferService = new MockBlockTransferService(0)
+    val blockLocations = Seq(BlockManagerId("id-0", "host-0", 1))
+    val blockStatus = BlockStatus(StorageLevel.DISK_ONLY, 0L, 2000L)
+
+    when(mockBlockManagerMaster.getLocationsAndStatus(mc.any[BlockId])).thenReturn(
+      Option(BlockLocationsAndStatus(blockLocations, blockStatus)))
+    when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockLocations)
+
+    store = makeBlockManager(8000, "executor1", mockBlockManagerMaster,
+      transferService = Option(mockBlockTransferService))
+    val block = store.getRemoteBytes("item")
+      .asInstanceOf[Option[ByteBuffer]]
+
+    assert(block.isDefined)
+    assert(mockBlockTransferService.numCalls === 1)
+    // assert FileManager is not null if the block size is larger than threshold.
+    assert(mockBlockTransferService.tempFileManager === store.remoteBlockTempFileManager)
+  }
+
   class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
     var numCalls = 0
+    var tempFileManager: TempFileManager = null
 
     override def init(blockDataManager: BlockDataManager): Unit = {}
 
@@ -1382,7 +1410,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
         execId: String,
         blockIds: Array[String],
         listener: BlockFetchingListener,
-        tempShuffleFileManager: TempShuffleFileManager): Unit = {
+        tempFileManager: TempFileManager): Unit = {
       listener.onBlockFetchSuccess("mockBlockId", new NioManagedBuffer(ByteBuffer.allocate(1)))
     }
 
@@ -1394,7 +1422,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
     override def uploadBlock(
         hostname: String,
-        port: Int, execId: String,
+        port: Int,
+        execId: String,
         blockId: BlockId,
         blockData: ManagedBuffer,
         level: StorageLevel,
@@ -1407,12 +1436,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
         host: String,
         port: Int,
         execId: String,
-        blockId: String): ManagedBuffer = {
+        blockId: String,
+        tempFileManager: TempFileManager): ManagedBuffer = {
       numCalls += 1
+      this.tempFileManager = tempFileManager
       if (numCalls <= maxFailures) {
         throw new RuntimeException("Failing block fetch in the mock block transfer service")
       }
-      super.fetchBlockSync(host, port, execId, blockId)
+      super.fetchBlockSync(host, port, execId, blockId, tempFileManager)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index c371cbc..5bfe990 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.PrivateMethodTester
 import org.apache.spark.{SparkFunSuite, TaskContext}
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, TempShuffleFileManager}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, TempFileManager}
 import org.apache.spark.network.util.LimitedInputStream
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.Utils
@@ -437,12 +437,12 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     val remoteBlocks = Map[BlockId, ManagedBuffer](
       ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer())
     val transfer = mock(classOf[BlockTransferService])
-    var tempShuffleFileManager: TempShuffleFileManager = null
+    var tempFileManager: TempFileManager = null
     when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
       .thenAnswer(new Answer[Unit] {
         override def answer(invocation: InvocationOnMock): Unit = {
           val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-          tempShuffleFileManager = invocation.getArguments()(5).asInstanceOf[TempShuffleFileManager]
+          tempFileManager = invocation.getArguments()(5).asInstanceOf[TempFileManager]
           Future {
             listener.onBlockFetchSuccess(
               ShuffleBlockId(0, 0, 0).toString, remoteBlocks(ShuffleBlockId(0, 0, 0)))
@@ -472,13 +472,13 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     fetchShuffleBlock(blocksByAddress1)
     // `maxReqSizeShuffleToMem` is 200, which is greater than the block size 100, so don't fetch
     // shuffle block to disk.
-    assert(tempShuffleFileManager == null)
+    assert(tempFileManager == null)
 
     val blocksByAddress2 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
       (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 300L)).toSeq))
     fetchShuffleBlock(blocksByAddress2)
     // `maxReqSizeShuffleToMem` is 200, which is smaller than the block size 300, so fetch
     // shuffle block to disk.
-    assert(tempShuffleFileManager != null)
+    assert(tempFileManager != null)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e1960c3d/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 7a777d3..bb06c8f 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -547,13 +547,14 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.reducer.maxReqSizeShuffleToMem</code></td>
+  <td><code>spark.maxRemoteBlockSizeFetchToMem</code></td>
   <td>Long.MaxValue</td>
   <td>
-    The blocks of a shuffle request will be fetched to disk when size of the request is above
-    this threshold. This is to avoid a giant request takes too much memory. We can enable this
-    config by setting a specific value(e.g. 200m). Note that this config can be enabled only when
-    the shuffle shuffle service is newer than Spark-2.2 or the shuffle service is disabled.
+    The remote block will be fetched to disk when size of the block is above this threshold.
+    This is to avoid a giant request takes too much memory. We can enable this config by setting
+    a specific value(e.g. 200m). Note this configuration will affect both shuffle fetch 
+    and block manager remote block fetch. For users who enabled external shuffle service,
+    this feature can only be worked when external shuffle service is newer than Spark 2.2.
   </td>
 </tr>
 <tr>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org