You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/09/15 01:34:31 UTC

[spark] branch branch-3.0 updated: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fe6ff15  [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast
fe6ff15 is described below

commit fe6ff1577d18fc919db926894b32500e17e07ecb
Author: LantaoJin <ji...@gmail.com>
AuthorDate: Mon Sep 14 18:24:52 2020 -0700

    [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast
    
    ### What changes were proposed in this pull request?
    In TorrentBroadcast.scala
    ```scala
    L133: if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false))
    L137: TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
    L147: if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true))
    ```
    After the original value is saved successfully(TorrentBroadcast.scala: L133), but the following `blockifyObject()`(L137) or store piece(L147) steps are failed. There is no opportunity to release broadcast from memory.
    
    This patch is to remove all pieces of the broadcast when failed to blockify or failed to store some pieces of a broadcast.
    
    ### Why are the changes needed?
    We use Spark thrift-server as a long-running service. A bad query submitted a heavy BroadcastNestLoopJoin operation and made driver full GC. We killed the bad query but we found the driver's memory usage was still high and full GCs were still frequent. By investigating with GC dump and log, we found the broadcast may memory leak.
    
    > 2020-08-19T18:54:02.824-0700: [Full GC (Allocation Failure)
    2020-08-19T18:54:02.824-0700: [Class Histogram (before full gc):
    116G->112G(170G), 184.9121920 secs]
    [Eden: 32.0M(7616.0M)->0.0B(8704.0M) Survivors: 1088.0M->0.0B Heap: 116.4G(170.0G)->112.9G(170.0G)], [Metaspace: 177285K->177270K(182272K)]
    1: 676531691 72035438432 [B
    2: 676502528 32472121344 org.apache.spark.sql.catalyst.expressions.UnsafeRow
    3: 99551 12018117568 [Ljava.lang.Object;
    4: 26570 4349629040 [I
    5: 6 3264536688 [Lorg.apache.spark.sql.catalyst.InternalRow;
    6: 1708819 256299456 [C
    7: 2338 179615208 [J
    8: 1703669 54517408 java.lang.String
    9: 103860 34896960 org.apache.spark.status.TaskDataWrapper
    10: 177396 25545024 java.net.URI
    ...
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manually test. This UT is hard to write and the patch is straightforward.
    
    Closes #29558 from LantaoJin/SPARK-32715.
    
    Authored-by: LantaoJin <ji...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 7a9b066c66d29e946b4f384292021123beb6fe57)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/spark/broadcast/TorrentBroadcast.scala  | 32 ++++++++++++++--------
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 77fbbc0..1024d9b 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -133,22 +133,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
     if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
       throw new SparkException(s"Failed to store $broadcastId in BlockManager")
     }
-    val blocks =
-      TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
-    if (checksumEnabled) {
-      checksums = new Array[Int](blocks.length)
-    }
-    blocks.zipWithIndex.foreach { case (block, i) =>
+    try {
+      val blocks =
+        TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
       if (checksumEnabled) {
-        checksums(i) = calcChecksum(block)
+        checksums = new Array[Int](blocks.length)
       }
-      val pieceId = BroadcastBlockId(id, "piece" + i)
-      val bytes = new ChunkedByteBuffer(block.duplicate())
-      if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
-        throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
+      blocks.zipWithIndex.foreach { case (block, i) =>
+        if (checksumEnabled) {
+          checksums(i) = calcChecksum(block)
+        }
+        val pieceId = BroadcastBlockId(id, "piece" + i)
+        val bytes = new ChunkedByteBuffer(block.duplicate())
+        if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
+          throw new SparkException(s"Failed to store $pieceId of $broadcastId " +
+            s"in local BlockManager")
+        }
       }
+      blocks.length
+    } catch {
+      case t: Throwable =>
+        logError(s"Store broadcast $broadcastId fail, remove all pieces of the broadcast")
+        blockManager.removeBroadcast(id, tellMaster = true)
+        throw t
     }
-    blocks.length
   }
 
   /** Fetch torrent blocks from the driver and/or other executors. */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org