You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/22 06:22:40 UTC

[GitHub] [spark] mridulm commented on a change in pull request #32287: [SPARK-27991][CORE] Defer the fetch request on Netty OOM

mridulm commented on a change in pull request #32287:
URL: https://github.com/apache/spark/pull/32287#discussion_r618106484



##########
File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -683,7 +694,28 @@ final class ShuffleBlockFetcherIterator(
             }
           }
 
-        case FailureFetchResult(blockId, mapIndex, address, e) =>
+        // Catching OOM and do something based on it is only a workaround for handling the
+        // Netty OOM issue, which is not the best way towards memory management. We can
+        // get rid of it when we find a way to manage Netty's memory precisely.
+        case FailureFetchResult(blockId, mapIndex, address, size, isNetworkReqDone, e)
+            if e.isInstanceOf[OutOfDirectMemoryError] || e.isInstanceOf[NettyOutOfMemoryError] =>
+          assert(address != blockManager.blockManagerId &&
+            !hostLocalBlocks.contains(blockId -> mapIndex),
+            "Netty OOM error should only happen on remote fetch requests")
+          logWarning(s"Failed to fetch block $blockId due to Netty OOM, will retry", e)
+          NettyUtils.isNettyOOMOnShuffle = true
+          numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1
+          bytesInFlight -= size
+          if (isNetworkReqDone) {
+            reqsInFlight -= 1
+            logDebug("Number of requests in flight " + reqsInFlight)
+          }
+          val defReqQueue =
+            deferredFetchRequests.getOrElseUpdate(address, new Queue[FetchRequest]())
+          defReqQueue.enqueue(FetchRequest(address, Array(FetchBlockInfo(blockId, size, mapIndex))))

Review comment:
       How many times will we retry ?

##########
File path: common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
##########
@@ -37,6 +37,14 @@
  */
 public class NettyUtils {
 
+  /**
+   * A flag which indicates whether the Netty OOM error has raised during shuffle.
+   * If true, unless there's no in-flight fetch requests, all the pending shuffle
+   * fetch requests will be deferred until the flag is unset (whenever there's a
+   * complete fetch request).
+   */
+  public static volatile boolean isNettyOOMOnShuffle = false;

Review comment:
       super nit: Make it `AtomicBoolean` instead ?

##########
File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -801,6 +838,8 @@ private class BufferReleasingInputStream(
     if (!closed) {
       delegate.close()
       iterator.releaseCurrentResultBuffer()
+      // Unset the flag when a remote request finished.
+      if (isNetworkReqDone) NettyUtils.isNettyOOMOnShuffle = false
       closed = true

Review comment:
       Move this intro try/finally ? close exceptions can get ignored; while `isNettyOOMOnShuffle` will never get unset

##########
File path: common/network-common/src/main/java/org/apache/spark/network/util/NettyOutOfMemoryError.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+/**
+ * This class is for test only. It's needed because neither the constructor of Netty's
+ * {@link io.netty.util.internal.OutOfDirectMemoryError} can be accessed nor be mocked
+ * due to the type is declared as final.
+ */
+public class NettyOutOfMemoryError extends OutOfMemoryError {

Review comment:
       Nit: Rename to make it clear this is just for test ?
   
   Thought: Can we use `SparkOutOfMemoryError` instead ? We can move it out to a module accessible from here as well ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org