You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/11/18 21:09:35 UTC

spark git commit: [SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use HashedWheelTimer (For branch-1.1)

Repository: spark
Updated Branches:
  refs/heads/branch-1.1 aa9ebdaa2 -> 91b5fa824


[SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use HashedWheelTimer (For branch-1.1)

This patch is intended to fix a subtle memory leak in ConnectionManager's ACK timeout TimerTasks: in the old code, each TimerTask held a reference to the message being sent and a cancelled TimerTask won't necessarily be garbage-collected until it's scheduled to run, so this caused huge buildups of messages that weren't garbage collected until their timeouts expired, leading to OOMs.

This patch addresses this problem by capturing only the message ID in the TimerTask instead of the whole message, and by keeping a WeakReference to the promise in the TimerTask. I've also modified this code to use Netty's HashedWheelTimer, whose performance characteristics should be better for this use-case.

Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>

Closes #3321 from sarutak/connection-manager-timeout-bugfix and squashes the following commits:

786af91 [Kousuke Saruta] Fixed memory leak issue of ConnectionManager


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91b5fa82
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91b5fa82
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91b5fa82

Branch: refs/heads/branch-1.1
Commit: 91b5fa82477e5fd43712fdf067d92a31d4037a83
Parents: aa9ebda
Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>
Authored: Tue Nov 18 12:09:18 2014 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Nov 18 12:09:18 2014 -0800

----------------------------------------------------------------------
 .../spark/network/ConnectionManager.scala       | 52 +++++++++++++++-----
 1 file changed, 39 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/91b5fa82/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 578d806..6d58129 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -18,11 +18,11 @@
 package org.apache.spark.network
 
 import java.io.IOException
+import java.lang.ref.WeakReference
 import java.nio._
 import java.nio.channels._
 import java.nio.channels.spi._
 import java.net._
-import java.util.{Timer, TimerTask}
 import java.util.concurrent.atomic.AtomicInteger
 
 import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
@@ -37,6 +37,8 @@ import scala.concurrent.{Await, ExecutionContext, Future, Promise}
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import io.netty.util.{Timeout, TimerTask, HashedWheelTimer}
+
 import org.apache.spark._
 import org.apache.spark.util.{SystemClock, Utils}
 
@@ -68,7 +70,8 @@ private[spark] class ConnectionManager(
   }
 
   private val selector = SelectorProvider.provider.openSelector()
-  private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true)
+  private val ackTimeoutMonitor =
+    new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))
 
   // default to 30 second timeout waiting for authentication
   private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
@@ -105,7 +108,10 @@ private[spark] class ConnectionManager(
     new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
   private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection]
     with SynchronizedMap[ConnectionManagerId, SendingConnection]
-  private val messageStatuses = new HashMap[Int, MessageStatus]
+  // Tracks sent messages for which we are awaiting acknowledgements.  Entries are added to this
+  // map when messages are sent and are removed when acknowledgement messages are received or when
+  // acknowledgement timeouts expire
+  private val messageStatuses = new HashMap[Int, MessageStatus]  // [MessageId, MessageStatus]
   private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
   private val registerRequests = new SynchronizedQueue[SendingConnection]
 
@@ -846,20 +852,41 @@ private[spark] class ConnectionManager(
       : Future[Message] = {
     val promise = Promise[Message]()
 
-    val timeoutTask = new TimerTask {
-      override def run(): Unit = {
+    // It's important that the TimerTask doesn't capture a reference to `message`, which can cause
+    // memory leaks since cancelled TimerTasks won't necessarily be garbage collected until the time
+    // at which they would originally be scheduled to run.  Therefore, extract the message id
+    // from outside of the TimerTask closure (see SPARK-4393 for more context).
+    val messageId = message.id
+    // Keep a weak reference to the promise so that the completed promise may be garbage-collected
+    val promiseReference = new WeakReference(promise)
+    val timeoutTask: TimerTask = new TimerTask {
+      override def run(timeout: Timeout): Unit = {
         messageStatuses.synchronized {
-          messageStatuses.remove(message.id).foreach ( s => {
-            promise.failure(
-              new IOException("sendMessageReliably failed because ack " +
-                s"was not received within $ackTimeout sec"))
-          })
+          messageStatuses.remove(messageId).foreach { s =>
+            val e = new IOException("sendMessageReliably failed because ack " +
+               s"was not received within $ackTimeout sec")
+            val p = promiseReference.get
+            if (p != null) {
+              // Attempt to fail the promise with a Timeout exception
+              if (!p.tryFailure(e)) {
+                // If we reach here, then someone else has already signalled success or failure
+                // on this promise, so log a warning:
+                logError("Ignore error because promise is completed", e)
+              }
+            } else {
+              // The WeakReference was empty, which should never happen because
+              // sendMessageReliably's caller should have a strong reference to promise.future;
+              logError("Promise was garbage collected; this should never happen!", e)
+            }
+          }
         }
       }
     }
 
+    val timeoutTaskHandle = ackTimeoutMonitor.newTimeout(timeoutTask, ackTimeout, TimeUnit.SECONDS)
+
     val status = new MessageStatus(message, connectionManagerId, s => {
-      timeoutTask.cancel()
+      timeoutTaskHandle.cancel()
       s.ackMessage match {
         case None => // Indicates a failure where we either never sent or never got ACK'd
           promise.failure(new IOException("sendMessageReliably failed without being ACK'd"))
@@ -876,7 +903,6 @@ private[spark] class ConnectionManager(
       messageStatuses += ((message.id, status))
     }
 
-    ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000)
     sendMessage(connectionManagerId, message)
     promise.future
   }
@@ -886,7 +912,7 @@ private[spark] class ConnectionManager(
   }
 
   def stop() {
-    ackTimeoutMonitor.cancel()
+    ackTimeoutMonitor.stop()
     selectorThread.interrupt()
     selectorThread.join()
     selector.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org