You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2017/07/19 20:53:37 UTC

spark git commit: [SPARK-21243][Core] Limit no. of map outputs in a shuffle fetch

Repository: spark
Updated Branches:
  refs/heads/master 70fe99dc6 -> ef6177558


[SPARK-21243][Core] Limit no. of map outputs in a shuffle fetch

## What changes were proposed in this pull request?
For configurations with external shuffle enabled, we have observed that if a very large no. of blocks are being fetched from a remote host, it puts the NM under extra pressure and can crash it. This change introduces a configuration `spark.reducer.maxBlocksInFlightPerAddress` , to limit the no. of map outputs being fetched from a given remote address. The changes applied here are applicable for both the scenarios - when external shuffle is enabled as well as disabled.

## How was this patch tested?
Ran the job with the default configuration which does not change the existing behavior and ran it with few configurations of lower values -10,20,50,100. The job ran fine and there is no change in the output. (I will update the metrics related to NM in some time.)

Author: Dhruve Ashar <dh...@gmail.com>

Closes #18487 from dhruve/impr/SPARK-21243.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef617755
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef617755
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef617755

Branch: refs/heads/master
Commit: ef617755868077dbc57de4e7edea8f01335f5556
Parents: 70fe99d
Author: Dhruve Ashar <dh...@gmail.com>
Authored: Wed Jul 19 15:53:28 2017 -0500
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Wed Jul 19 15:53:28 2017 -0500

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  | 11 +++
 .../spark/shuffle/BlockStoreShuffleReader.scala |  1 +
 .../storage/ShuffleBlockFetcherIterator.scala   | 81 +++++++++++++++++---
 .../ShuffleBlockFetcherIteratorSuite.scala      |  6 ++
 docs/configuration.md                           |  9 +++
 5 files changed, 98 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ef617755/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 512d539..ef28e2c 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -321,6 +321,17 @@ package object config {
       .intConf
       .createWithDefault(3)
 
+  private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
+    ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
+      .doc("This configuration limits the number of remote blocks being fetched per reduce task" +
+        " from a given host port. When a large number of blocks are being requested from a given" +
+        " address in a single fetch or simultaneously, this could crash the serving executor or" +
+        " Node Manager. This is especially useful to reduce the load on the Node Manager when" +
+        " external shuffle is enabled. You can mitigate the issue by setting it to a lower value.")
+      .intConf
+      .checkValue(_ > 0, "The max no. of blocks in flight cannot be non-positive.")
+      .createWithDefault(Int.MaxValue)
+
   private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM =
     ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem")
       .doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +

http://git-wip-us.apache.org/repos/asf/spark/blob/ef617755/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index 2fbac79..c8d1460 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -51,6 +51,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
       // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
       SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
       SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
+      SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
       SparkEnv.get.conf.get(config.REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM),
       SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ef617755/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 81d822d..2d176b6 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
 
 import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
@@ -52,6 +52,8 @@ import org.apache.spark.util.io.ChunkedByteBufferOutputStream
  * @param streamWrapper A function to wrap the returned input stream.
  * @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point.
  * @param maxReqsInFlight max number of remote requests to fetch blocks at any given point.
+ * @param maxBlocksInFlightPerAddress max number of shuffle blocks being fetched at any given point
+ *                                    for a given remote host:port.
  * @param maxReqSizeShuffleToMem max size (in bytes) of a request that can be shuffled to memory.
  * @param detectCorrupt whether to detect any corruption in fetched blocks.
  */
@@ -64,6 +66,7 @@ final class ShuffleBlockFetcherIterator(
     streamWrapper: (BlockId, InputStream) => InputStream,
     maxBytesInFlight: Long,
     maxReqsInFlight: Int,
+    maxBlocksInFlightPerAddress: Int,
     maxReqSizeShuffleToMem: Long,
     detectCorrupt: Boolean)
   extends Iterator[(BlockId, InputStream)] with TempShuffleFileManager with Logging {
@@ -110,12 +113,21 @@ final class ShuffleBlockFetcherIterator(
    */
   private[this] val fetchRequests = new Queue[FetchRequest]
 
+  /**
+   * Queue of fetch requests which could not be issued the first time they were dequeued. These
+   * requests are tried again when the fetch constraints are satisfied.
+   */
+  private[this] val deferredFetchRequests = new HashMap[BlockManagerId, Queue[FetchRequest]]()
+
   /** Current bytes in flight from our requests */
   private[this] var bytesInFlight = 0L
 
   /** Current number of requests in flight */
   private[this] var reqsInFlight = 0
 
+  /** Current number of blocks in flight per host:port */
+  private[this] val numBlocksInFlightPerAddress = new HashMap[BlockManagerId, Int]()
+
   /**
    * The blocks that can't be decompressed successfully, it is used to guarantee that we retry
    * at most once for those corrupted blocks.
@@ -248,7 +260,8 @@ final class ShuffleBlockFetcherIterator(
     // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
     // nodes, rather than blocking on reading output from one node.
     val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
-    logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)
+    logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize
+      + ", maxBlocksInFlightPerAddress: " + maxBlocksInFlightPerAddress)
 
     // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
     // at most maxBytesInFlight in order to limit the amount of data in flight.
@@ -277,11 +290,13 @@ final class ShuffleBlockFetcherIterator(
           } else if (size < 0) {
             throw new BlockException(blockId, "Negative block size " + size)
           }
-          if (curRequestSize >= targetRequestSize) {
+          if (curRequestSize >= targetRequestSize ||
+              curBlocks.size >= maxBlocksInFlightPerAddress) {
             // Add this FetchRequest
             remoteRequests += new FetchRequest(address, curBlocks)
+            logDebug(s"Creating fetch request of $curRequestSize at $address "
+              + s"with ${curBlocks.size} blocks")
             curBlocks = new ArrayBuffer[(BlockId, Long)]
-            logDebug(s"Creating fetch request of $curRequestSize at $address")
             curRequestSize = 0
           }
         }
@@ -375,6 +390,7 @@ final class ShuffleBlockFetcherIterator(
       result match {
         case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) =>
           if (address != blockManager.blockManagerId) {
+            numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1
             shuffleMetrics.incRemoteBytesRead(buf.size)
             if (buf.isInstanceOf[FileSegmentManagedBuffer]) {
               shuffleMetrics.incRemoteBytesReadToDisk(buf.size)
@@ -443,12 +459,57 @@ final class ShuffleBlockFetcherIterator(
   }
 
   private def fetchUpToMaxBytes(): Unit = {
-    // Send fetch requests up to maxBytesInFlight
-    while (fetchRequests.nonEmpty &&
-      (bytesInFlight == 0 ||
-        (reqsInFlight + 1 <= maxReqsInFlight &&
-          bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {
-      sendRequest(fetchRequests.dequeue())
+    // Send fetch requests up to maxBytesInFlight. If you cannot fetch from a remote host
+    // immediately, defer the request until the next time it can be processed.
+
+    // Process any outstanding deferred fetch requests if possible.
+    if (deferredFetchRequests.nonEmpty) {
+      for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
+        while (isRemoteBlockFetchable(defReqQueue) &&
+            !isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
+          val request = defReqQueue.dequeue()
+          logDebug(s"Processing deferred fetch request for $remoteAddress with "
+            + s"${request.blocks.length} blocks")
+          send(remoteAddress, request)
+          if (defReqQueue.isEmpty) {
+            deferredFetchRequests -= remoteAddress
+          }
+        }
+      }
+    }
+
+    // Process any regular fetch requests if possible.
+    while (isRemoteBlockFetchable(fetchRequests)) {
+      val request = fetchRequests.dequeue()
+      val remoteAddress = request.address
+      if (isRemoteAddressMaxedOut(remoteAddress, request)) {
+        logDebug(s"Deferring fetch request for $remoteAddress with ${request.blocks.size} blocks")
+        val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, new Queue[FetchRequest]())
+        defReqQueue.enqueue(request)
+        deferredFetchRequests(remoteAddress) = defReqQueue
+      } else {
+        send(remoteAddress, request)
+      }
+    }
+
+    def send(remoteAddress: BlockManagerId, request: FetchRequest): Unit = {
+      sendRequest(request)
+      numBlocksInFlightPerAddress(remoteAddress) =
+        numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + request.blocks.size
+    }
+
+    def isRemoteBlockFetchable(fetchReqQueue: Queue[FetchRequest]): Boolean = {
+      fetchReqQueue.nonEmpty &&
+        (bytesInFlight == 0 ||
+          (reqsInFlight + 1 <= maxReqsInFlight &&
+            bytesInFlight + fetchReqQueue.front.size <= maxBytesInFlight))
+    }
+
+    // Checks if sending a new fetch request will exceed the max no. of blocks being fetched from a
+    // given remote address.
+    def isRemoteAddressMaxedOut(remoteAddress: BlockManagerId, request: FetchRequest): Boolean = {
+      numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + request.blocks.size >
+        maxBlocksInFlightPerAddress
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ef617755/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 6a70ced..c371cbc 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -110,6 +110,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
       48 * 1024 * 1024,
       Int.MaxValue,
       Int.MaxValue,
+      Int.MaxValue,
       true)
 
     // 3 local blocks fetched in initialization
@@ -187,6 +188,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
       48 * 1024 * 1024,
       Int.MaxValue,
       Int.MaxValue,
+      Int.MaxValue,
       true)
 
     verify(blocks(ShuffleBlockId(0, 0, 0)), times(0)).release()
@@ -254,6 +256,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
       48 * 1024 * 1024,
       Int.MaxValue,
       Int.MaxValue,
+      Int.MaxValue,
       true)
 
     // Continue only after the mock calls onBlockFetchFailure
@@ -319,6 +322,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
       48 * 1024 * 1024,
       Int.MaxValue,
       Int.MaxValue,
+      Int.MaxValue,
       true)
 
     // Continue only after the mock calls onBlockFetchFailure
@@ -400,6 +404,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
       48 * 1024 * 1024,
       Int.MaxValue,
       Int.MaxValue,
+      Int.MaxValue,
       false)
 
     // Continue only after the mock calls onBlockFetchFailure
@@ -457,6 +462,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
         (_, in) => in,
         maxBytesInFlight = Int.MaxValue,
         maxReqsInFlight = Int.MaxValue,
+        maxBlocksInFlightPerAddress = Int.MaxValue,
         maxReqSizeShuffleToMem = 200,
         detectCorrupt = true)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ef617755/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 91b5bef..d3df923 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -536,6 +536,15 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+    <td><code>spark.reducer.maxBlocksInFlightPerAddress</code></td>
+    <td>Int.MaxValue</td>
+    <td>
+      This configuration limits the number of remote blocks being fetched per reduce task from a
+      given host port. When a large number of blocks are being requested from a given address in a
+      single fetch or simultaneously, this could crash the serving executor or Node Manager. This
+      is especially useful to reduce the load on the Node Manager when external shuffle is enabled.
+      You can mitigate this issue by setting it to a lower value.
+    </td>
   <td><code>spark.reducer.maxReqSizeShuffleToMem</code></td>
   <td>Long.MaxValue</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org