You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/10 15:29:58 UTC

[GitHub] [spark] mridulm commented on a change in pull request #29558: [SPARK-32715][CORE] Fix memory leak when failed to store pieces of broadcast

mridulm commented on a change in pull request #29558:
URL: https://github.com/apache/spark/pull/29558#discussion_r486436039



##########
File path: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
##########
@@ -130,25 +130,33 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
     // Store a copy of the broadcast variable in the driver so that tasks run on the driver
     // do not create a duplicate copy of the broadcast variable's value.
     val blockManager = SparkEnv.get.blockManager
-    if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
-      throw new SparkException(s"Failed to store $broadcastId in BlockManager")
-    }
-    val blocks =
-      TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
-    if (checksumEnabled) {
-      checksums = new Array[Int](blocks.length)
-    }
-    blocks.zipWithIndex.foreach { case (block, i) =>
+    try {
+      if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
+        throw new SparkException(s"Failed to store $broadcastId in BlockManager")
+      }
+      val blocks =
+        TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
       if (checksumEnabled) {
-        checksums(i) = calcChecksum(block)
+        checksums = new Array[Int](blocks.length)
       }
-      val pieceId = BroadcastBlockId(id, "piece" + i)
-      val bytes = new ChunkedByteBuffer(block.duplicate())
-      if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
-        throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
+      blocks.zipWithIndex.foreach { case (block, i) =>
+        if (checksumEnabled) {
+          checksums(i) = calcChecksum(block)
+        }
+        val pieceId = BroadcastBlockId(id, "piece" + i)
+        val bytes = new ChunkedByteBuffer(block.duplicate())
+        if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
+          throw new SparkException(s"Failed to store $pieceId of $broadcastId " +
+            s"in local BlockManager")
+        }
       }
+      blocks.length
+    } catch {
+      case t: Throwable =>
+        logError(s"Store broadcast $broadcastId fail, remove all pieces of the broadcast")
+        blockManager.removeBroadcast(id, tellMaster = true)

Review comment:
       I agree with @dongjoon-hyun - `blockManager.putSingle` for broadcastid should be outside try/catch. Rest look fine.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org