You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2014/10/27 16:45:49 UTC

git commit: [SPARK-4030] Make destroy public for broadcast variables

Repository: spark
Updated Branches:
  refs/heads/master 6377adaf3 -> 9aa340a23


[SPARK-4030] Make destroy public for broadcast variables

This change makes the destroy function public for broadcast variables. Motivation for the change is described in https://issues.apache.org/jira/browse/SPARK-4030.
This patch also logs where destroy was called from if a broadcast variable is used after destruction.

Author: Shivaram Venkataraman <sh...@cs.berkeley.edu>

Closes #2922 from shivaram/broadcast-destroy and squashes the following commits:

a11abab [Shivaram Venkataraman] Fix scala style in Utils.scala
bed9c9d [Shivaram Venkataraman] Make destroy blocking by default
e80c1ab [Shivaram Venkataraman] Make destroy public for broadcast variables Also log where destroy was called from if a broadcast variable is used after destruction.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9aa340a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9aa340a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9aa340a2

Branch: refs/heads/master
Commit: 9aa340a23fd7532f5e72c3352df92ce3e857fc80
Parents: 6377ada
Author: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Authored: Mon Oct 27 08:45:36 2014 -0700
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Mon Oct 27 08:45:36 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/broadcast/Broadcast.scala  | 22 ++++++++++++++++++--
 .../scala/org/apache/spark/util/Utils.scala     |  3 ++-
 .../apache/spark/broadcast/BroadcastSuite.scala | 20 +++++++++++++++++-
 3 files changed, 41 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9aa340a2/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 15fd30e..87f5cf9 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -20,6 +20,8 @@ package org.apache.spark.broadcast
 import java.io.Serializable
 
 import org.apache.spark.SparkException
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
 
 import scala.reflect.ClassTag
 
@@ -52,7 +54,7 @@ import scala.reflect.ClassTag
  * @param id A unique identifier for the broadcast variable.
  * @tparam T Type of the data contained in the broadcast variable.
  */
-abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
+abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging {
 
   /**
    * Flag signifying whether the broadcast variable is valid
@@ -60,6 +62,8 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
    */
   @volatile private var _isValid = true
 
+  private var _destroySite = ""
+
   /** Get the broadcasted value. */
   def value: T = {
     assertValid()
@@ -84,13 +88,26 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
     doUnpersist(blocking)
   }
 
+
+  /**
+   * Destroy all data and metadata related to this broadcast variable. Use this with caution;
+   * once a broadcast variable has been destroyed, it cannot be used again.
+   * This method blocks until destroy has completed
+   */
+  def destroy() {
+    destroy(blocking = true)
+  }
+
   /**
    * Destroy all data and metadata related to this broadcast variable. Use this with caution;
    * once a broadcast variable has been destroyed, it cannot be used again.
+   * @param blocking Whether to block until destroy has completed
    */
   private[spark] def destroy(blocking: Boolean) {
     assertValid()
     _isValid = false
+    _destroySite = Utils.getCallSite().shortForm
+    logInfo("Destroying %s (from %s)".format(toString, _destroySite))
     doDestroy(blocking)
   }
 
@@ -124,7 +141,8 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
   /** Check if this broadcast is valid. If not valid, exception is thrown. */
   protected def assertValid() {
     if (!_isValid) {
-      throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
+      throw new SparkException(
+        "Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9aa340a2/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index d722ee5..84ed5db 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -992,7 +992,8 @@ private[spark] object Utils extends Logging {
   private def coreExclusionFunction(className: String): Boolean = {
     // A regular expression to match classes of the "core" Spark API that we want to skip when
     // finding the call site of a method.
-    val SPARK_CORE_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
+    val SPARK_CORE_CLASS_REGEX =
+      """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?(\.broadcast)?\.[A-Z]""".r
     val SCALA_CLASS_REGEX = """^scala""".r
     val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined
     val isScalaClass = SCALA_CLASS_REGEX.findFirstIn(className).isDefined

http://git-wip-us.apache.org/repos/asf/spark/blob/9aa340a2/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index e096c8c..1014fd6 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.broadcast
 
 import scala.util.Random
 
-import org.scalatest.FunSuite
+import org.scalatest.{Assertions, FunSuite}
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException}
 import org.apache.spark.io.SnappyCompressionCodec
@@ -136,6 +136,12 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
   test("Unpersisting TorrentBroadcast on executors and driver in distributed mode") {
     testUnpersistTorrentBroadcast(distributed = true, removeFromDriver = true)
   }
+
+  test("Using broadcast after destroy prints callsite") {
+    sc = new SparkContext("local", "test")
+    testPackage.runCallSiteTest(sc)
+  }
+
   /**
    * Verify the persistence of state associated with an HttpBroadcast in either local mode or
    * local-cluster mode (when distributed = true).
@@ -311,3 +317,15 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
     conf
   }
 }
+
+package object testPackage extends Assertions {
+
+  def runCallSiteTest(sc: SparkContext) {
+    val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val broadcast = sc.broadcast(rdd)
+    broadcast.destroy()
+    val thrown = intercept[SparkException] { broadcast.value }
+    assert(thrown.getMessage.contains("BroadcastSuite.scala"))
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org