You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/07/31 15:15:32 UTC

[GitHub] [spark] Ngone51 commented on a change in pull request #25262: [SPARK-28486][CORE][PYTHON] Map PythonBroadcast's data file to a BroadcastBlock to avoid delete by GC

Ngone51 commented on a change in pull request #25262: [SPARK-28486][CORE][PYTHON] Map PythonBroadcast's data file to a BroadcastBlock to avoid delete by GC
URL: https://github.com/apache/spark/pull/25262#discussion_r309279029
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 ##########
 @@ -717,33 +720,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
   }
 
   /**
-   * Write data into disk, using randomly generated name.
+   * Write data into disk and map it to a broadcast block.
    */
-  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
-    val dir = new File(Utils.getLocalDir(SparkEnv.get.conf))
-    val file = File.createTempFile("broadcast", "", dir)
-    path = file.getAbsolutePath
-    val out = new FileOutputStream(file)
-    Utils.tryWithSafeFinally {
-      Utils.copyStream(in, out)
-    } {
-      out.close()
-    }
-  }
-
-  /**
-   * Delete the file once the object is GCed.
-   */
-  override def finalize() {
-    if (!path.isEmpty) {
-      val file = new File(path)
-      if (file.exists()) {
-        if (!file.delete()) {
-          logWarning(s"Error deleting ${file.getPath}")
+  private def readObject(in: ObjectInputStream): Unit = {
+    broadcastId = in.readLong()
+    val blockId = BroadcastBlockId(broadcastId, "python")
+    val blockManager = SparkEnv.get.blockManager
+    val diskBlockManager = blockManager.diskBlockManager
+    if (!diskBlockManager.containsBlock(blockId)) {
+      Utils.tryOrIOException {
+        val dir = new File(Utils.getLocalDir(SparkEnv.get.conf))
+        val file = File.createTempFile("broadcast", "", dir)
+        val out = new FileOutputStream(file)
+        Utils.tryWithSafeFinally {
+          val size = Utils.copyStream(in, out)
 
 Review comment:
   Is it necessary ? I think we've already got the write lock on the block before we're going to modify it ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org