You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/10 08:26:36 UTC

[1/3] git commit: SPARK-1407 drain event queue before stopping event logger

Repository: spark
Updated Branches:
  refs/heads/branch-1.0 bde9cc11f -> 8ca3b2bc9


SPARK-1407 drain event queue before stopping event logger

Author: Kan Zhang <kz...@apache.org>

Closes #366 from kanzhang/SPARK-1407 and squashes the following commits:

cd0629f [Kan Zhang] code refactoring and adding test
b073ee6 [Kan Zhang] SPARK-1407 drain event queue before stopping event logger


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb5f2b64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb5f2b64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb5f2b64

Branch: refs/heads/branch-1.0
Commit: eb5f2b64230faa69a53815cb61bcc87aeb233d20
Parents: bde9cc1
Author: Kan Zhang <kz...@apache.org>
Authored: Wed Apr 9 15:24:33 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Apr 9 15:25:29 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  4 +-
 .../spark/scheduler/LiveListenerBus.scala       | 32 ++++++++------
 .../spark/scheduler/SparkListenerSuite.scala    | 45 ++++++++++++++++++++
 .../org/apache/spark/examples/SparkHdfsLR.scala |  2 +-
 4 files changed, 67 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f775051..7630523 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -931,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging {
   /** Shut down the SparkContext. */
   def stop() {
     ui.stop()
-    eventLogger.foreach(_.stop())
     // Do this only if not stopped already - best case effort.
     // prevent NPE if stopped more than once.
     val dagSchedulerCopy = dagScheduler
@@ -940,13 +939,14 @@ class SparkContext(config: SparkConf) extends Logging {
       metadataCleaner.cancel()
       cleaner.foreach(_.stop())
       dagSchedulerCopy.stop()
-      listenerBus.stop()
       taskScheduler = null
       // TODO: Cache.stop()?
       env.stop()
       SparkEnv.set(null)
       ShuffleMapTask.clearCache()
       ResultTask.clearCache()
+      listenerBus.stop()
+      eventLogger.foreach(_.stop())
       logInfo("Successfully stopped SparkContext")
     } else {
       logInfo("SparkContext already stopped")

http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 353a486..76f3e32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
   private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
   private var queueFullErrorMessageLogged = false
   private var started = false
+  private val listenerThread = new Thread("SparkListenerBus") {
+    setDaemon(true)
+    override def run() {
+      while (true) {
+        val event = eventQueue.take
+        if (event == SparkListenerShutdown) {
+          // Get out of the while loop and shutdown the daemon thread
+          return
+        }
+        postToAll(event)
+      }
+    }
+  }
+
+  // Exposed for testing
+  @volatile private[spark] var stopCalled = false
 
   /**
    * Start sending events to attached listeners.
@@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
     if (started) {
       throw new IllegalStateException("Listener bus already started!")
     }
+    listenerThread.start()
     started = true
-    new Thread("SparkListenerBus") {
-      setDaemon(true)
-      override def run() {
-        while (true) {
-          val event = eventQueue.take
-          if (event == SparkListenerShutdown) {
-            // Get out of the while loop and shutdown the daemon thread
-            return
-          }
-          postToAll(event)
-        }
-      }
-    }.start()
   }
 
   def post(event: SparkListenerEvent) {
@@ -93,9 +97,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
   }
 
   def stop() {
+    stopCalled = true
     if (!started) {
       throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
     }
     post(SparkListenerShutdown)
+    listenerThread.join()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 7c84377..dc704e0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import java.util.concurrent.Semaphore
+
 import scala.collection.mutable
 
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
@@ -72,6 +74,49 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
     }
   }
 
+  test("bus.stop() waits for the event queue to completely drain") {
+    @volatile var drained = false
+
+    // Tells the listener to stop blocking
+    val listenerWait = new Semaphore(1)
+
+    // When stop has returned
+    val stopReturned = new Semaphore(1)
+
+    class BlockingListener extends SparkListener {
+      override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
+        listenerWait.acquire()
+        drained = true
+      }
+    }
+
+    val bus = new LiveListenerBus
+    val blockingListener = new BlockingListener
+
+    bus.addListener(blockingListener)
+    bus.start()
+    bus.post(SparkListenerJobEnd(0, JobSucceeded))
+
+    // the queue should not drain immediately
+    assert(!drained)
+
+    new Thread("ListenerBusStopper") {
+      override def run() {
+        // stop() will block until notify() is called below
+        bus.stop()
+        stopReturned.release(1)
+      }
+    }.start()
+
+    while (!bus.stopCalled) {
+      Thread.sleep(10)
+    }
+
+    listenerWait.release()
+    stopReturned.acquire()
+    assert(drained)
+  }
+
   test("basic creation of StageInfo") {
     val listener = new SaveStageAndTaskInfo
     sc.addSparkListener(listener)

http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index e698b9b..038afbc 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -73,6 +73,6 @@ object SparkHdfsLR {
     }
 
     println("Final w: " + w)
-    System.exit(0)
+    sc.stop()
   }
 }


[2/3] git commit: [SPARK-1357 (fix)] remove empty line after :: DeveloperApi/Experimental ::

Posted by pw...@apache.org.
[SPARK-1357 (fix)] remove empty line after :: DeveloperApi/Experimental ::

Remove empty line after :: DeveloperApi/Experimental :: in comments to make the original doc show up in the preview of the generated html docs. Thanks @andrewor14 !

Author: Xiangrui Meng <me...@databricks.com>

Closes #373 from mengxr/api and squashes the following commits:

9c35bdc [Xiangrui Meng] remove the empty line after :: DeveloperApi/Experimental ::


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0adc932a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0adc932a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0adc932a

Branch: refs/heads/branch-1.0
Commit: 0adc932add413a1754107b21d5ecfb38c0c3a4eb
Parents: eb5f2b6
Author: Xiangrui Meng <me...@databricks.com>
Authored: Wed Apr 9 17:08:17 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Apr 9 17:08:17 2014 -0700

----------------------------------------------------------------------
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  1 -
 .../spark/mllib/classification/NaiveBayes.scala |  1 -
 .../apache/spark/mllib/clustering/KMeans.scala  |  4 --
 .../linalg/distributed/CoordinateMatrix.scala   |  1 -
 .../linalg/distributed/IndexedRowMatrix.scala   |  2 -
 .../mllib/linalg/distributed/RowMatrix.scala    |  1 -
 .../spark/mllib/optimization/Gradient.scala     |  4 --
 .../mllib/optimization/GradientDescent.scala    |  2 -
 .../spark/mllib/optimization/Optimizer.scala    |  1 -
 .../spark/mllib/optimization/Updater.scala      |  4 --
 .../apache/spark/mllib/recommendation/ALS.scala |  1 -
 .../MatrixFactorizationModel.scala              |  1 -
 .../regression/GeneralizedLinearAlgorithm.scala |  1 -
 .../apache/spark/mllib/tree/DecisionTree.scala  |  1 -
 .../spark/mllib/tree/configuration/Algo.scala   |  1 -
 .../mllib/tree/configuration/FeatureType.scala  |  1 -
 .../tree/configuration/QuantileStrategy.scala   |  1 -
 .../mllib/tree/configuration/Strategy.scala     |  1 -
 .../spark/mllib/tree/impurity/Entropy.scala     |  2 -
 .../apache/spark/mllib/tree/impurity/Gini.scala |  2 -
 .../spark/mllib/tree/impurity/Impurity.scala    |  3 --
 .../spark/mllib/tree/impurity/Variance.scala    |  2 -
 .../mllib/tree/model/DecisionTreeModel.scala    |  1 -
 .../mllib/tree/model/InformationGainStats.scala |  1 -
 .../apache/spark/mllib/tree/model/Node.scala    |  1 -
 .../apache/spark/mllib/tree/model/Split.scala   |  1 -
 .../spark/mllib/util/DataValidators.scala       |  1 -
 .../spark/mllib/util/KMeansDataGenerator.scala  |  1 -
 .../spark/mllib/util/LinearDataGenerator.scala  |  1 -
 .../util/LogisticRegressionDataGenerator.scala  |  1 -
 .../spark/mllib/util/MFDataGenerator.scala      | 43 ++++++++++----------
 .../org/apache/spark/mllib/util/MLUtils.scala   |  2 -
 .../spark/mllib/util/SVMDataGenerator.scala     |  1 -
 33 files changed, 21 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index ae27c57..a6c049e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -30,7 +30,6 @@ import org.apache.spark.rdd.RDD
 
 /**
  * :: DeveloperApi ::
- *
  * The Java stubs necessary for the Python mllib bindings.
  */
 @DeveloperApi

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index 5a45f12..1865885 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -29,7 +29,6 @@ import org.apache.spark.rdd.RDD
 
 /**
  * :: Experimental ::
- *
  * Model for Naive Bayes Classifiers.
  *
  * @param labels list of labels

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 8f565eb..90cf852 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -78,7 +78,6 @@ class KMeans private (
 
   /**
    * :: Experimental ::
-   *
    * Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm
    * this many times with random starting conditions (configured by the initialization mode), then
    * return the best clustering found over any run. Default: 1.
@@ -398,9 +397,6 @@ object KMeans {
     MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm)
   }
 
-  /**
-   * :: Experimental ::
-   */
   @Experimental
   def main(args: Array[String]) {
     if (args.length < 4) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
index 89d5c03..56b8fdc 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala
@@ -34,7 +34,6 @@ case class MatrixEntry(i: Long, j: Long, value: Double)
 
 /**
  * :: Experimental ::
- *
  * Represents a matrix in coordinate format.
  *
  * @param entries matrix entries

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
index 24c123a..132b3af 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
@@ -26,7 +26,6 @@ import org.apache.spark.mllib.linalg.SingularValueDecomposition
 
 /**
  * :: Experimental ::
- *
  * Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]].
  */
 @Experimental
@@ -34,7 +33,6 @@ case class IndexedRow(index: Long, vector: Vector)
 
 /**
  * :: Experimental ::
- *
  * Represents a row-oriented [[org.apache.spark.mllib.linalg.distributed.DistributedMatrix]] with
  * indexed rows.
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 8d32c1a..f65f43d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -30,7 +30,6 @@ import org.apache.spark.Logging
 
 /**
  * :: Experimental ::
- *
  * Represents a row-oriented distributed Matrix with no meaningful row indices.
  *
  * @param rows rows stored as an RDD[Vector]

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
index 1176dc9..679842f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
@@ -24,7 +24,6 @@ import org.apache.spark.mllib.linalg.{Vectors, Vector}
 
 /**
  * :: DeveloperApi ::
- *
  * Class used to compute the gradient for a loss function, given a single data point.
  */
 @DeveloperApi
@@ -56,7 +55,6 @@ abstract class Gradient extends Serializable {
 
 /**
  * :: DeveloperApi ::
- *
  * Compute gradient and loss for a logistic loss function, as used in binary classification.
  * See also the documentation for the precise formulation.
  */
@@ -100,7 +98,6 @@ class LogisticGradient extends Gradient {
 
 /**
  * :: DeveloperApi ::
- *
  * Compute gradient and loss for a Least-squared loss function, as used in linear regression.
  * This is correct for the averaged least squares loss function (mean squared error)
  *              L = 1/n ||A weights-y||^2
@@ -135,7 +132,6 @@ class LeastSquaresGradient extends Gradient {
 
 /**
  * :: DeveloperApi ::
- *
  * Compute gradient and loss for a Hinge loss function, as used in SVM binary classification.
  * See also the documentation for the precise formulation.
  * NOTE: This assumes that the labels are {0,1}

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
index 04267d9..f60417f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
@@ -28,7 +28,6 @@ import org.apache.spark.mllib.linalg.{Vectors, Vector}
 
 /**
  * :: DeveloperApi ::
- *
  * Class used to solve an optimization problem using Gradient Descent.
  * @param gradient Gradient function to be used.
  * @param updater Updater to be used to update weights after every iteration.
@@ -113,7 +112,6 @@ class GradientDescent(private var gradient: Gradient, private var updater: Updat
 
 /**
  * :: DeveloperApi ::
- *
  * Top-level method to run gradient descent.
  */
 @DeveloperApi

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
index 0a313f3..e41d9bb 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
@@ -24,7 +24,6 @@ import org.apache.spark.mllib.linalg.Vector
 
 /**
  * :: DeveloperApi ::
- *
  * Trait for optimization problem solvers.
  */
 @DeveloperApi

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
index e678167..3ed3a5b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
@@ -26,7 +26,6 @@ import org.apache.spark.mllib.linalg.{Vectors, Vector}
 
 /**
  * :: DeveloperApi ::
- *
  * Class used to perform steps (weight update) using Gradient Descent methods.
  *
  * For general minimization problems, or for regularized problems of the form
@@ -64,7 +63,6 @@ abstract class Updater extends Serializable {
 
 /**
  * :: DeveloperApi ::
- *
  * A simple updater for gradient descent *without* any regularization.
  * Uses a step-size decreasing with the square root of the number of iterations.
  */
@@ -86,7 +84,6 @@ class SimpleUpdater extends Updater {
 
 /**
  * :: DeveloperApi ::
- *
  * Updater for L1 regularized problems.
  *          R(w) = ||w||_1
  * Uses a step-size decreasing with the square root of the number of iterations.
@@ -131,7 +128,6 @@ class L1Updater extends Updater {
 
 /**
  * :: DeveloperApi ::
- *
  * Updater for L2 regularized problems.
  *          R(w) = 1/2 ||w||^2
  * Uses a step-size decreasing with the square root of the number of iterations.

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 60cbb1c..5cc47de 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -140,7 +140,6 @@ class ALS private (
 
   /**
    * :: Experimental ::
-   *
    * Sets the constant used in computing confidence in implicit ALS. Default: 1.0.
    */
   @Experimental

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index e05224f..471546c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -70,7 +70,6 @@ class MatrixFactorizationModel(
 
   /**
    * :: DeveloperApi ::
-   *
    * Predict the rating of many users for many products.
    * This is a Java stub for python predictAll()
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
index c24f5af..3bd0017 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -103,7 +103,6 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
 
   /**
    * :: Experimental ::
-   *
    * Set if the algorithm should validate data before training. Default true.
    */
   @Experimental

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
index c8a966c..3019447 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
@@ -35,7 +35,6 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors}
 
 /**
  * :: Experimental ::
- *
  * A class that implements a decision tree algorithm for classification and regression. It
  * supports both continuous and categorical features.
  * @param strategy The configuration parameters for the tree algorithm which specify the type

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala
index 017f84f..79a01f5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Algo.scala
@@ -21,7 +21,6 @@ import org.apache.spark.annotation.Experimental
 
 /**
  * :: Experimental ::
- *
  * Enum to select the algorithm for the decision tree
  */
 @Experimental

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/FeatureType.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/FeatureType.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/FeatureType.scala
index c0254c3..f4c8772 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/FeatureType.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/FeatureType.scala
@@ -21,7 +21,6 @@ import org.apache.spark.annotation.Experimental
 
 /**
  * :: Experimental ::
- *
  * Enum to describe whether a feature is "continuous" or "categorical"
  */
 @Experimental

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/QuantileStrategy.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/QuantileStrategy.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/QuantileStrategy.scala
index b3e8b22..7da976e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/QuantileStrategy.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/QuantileStrategy.scala
@@ -21,7 +21,6 @@ import org.apache.spark.annotation.Experimental
 
 /**
  * :: Experimental ::
- *
  * Enum for selecting the quantile calculation strategy
  */
 @Experimental

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
index 482faaa..8767aca 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
@@ -24,7 +24,6 @@ import org.apache.spark.mllib.tree.configuration.QuantileStrategy._
 
 /**
  * :: Experimental ::
- *
  * Stores all the configuration options for tree construction
  * @param algo classification or regression
  * @param impurity criterion used for information gain calculation

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Entropy.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Entropy.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Entropy.scala
index 55c43f2..60f43e9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Entropy.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Entropy.scala
@@ -21,7 +21,6 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
 
 /**
  * :: Experimental ::
- *
  * Class for calculating [[http://en.wikipedia.org/wiki/Binary_entropy_function entropy]] during
  * binary classification.
  */
@@ -32,7 +31,6 @@ object Entropy extends Impurity {
 
   /**
    * :: DeveloperApi ::
-   *
    * entropy calculation
    * @param c0 count of instances with label 0
    * @param c1 count of instances with label 1

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Gini.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Gini.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Gini.scala
index c923b8e..c51d76d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Gini.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Gini.scala
@@ -21,7 +21,6 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
 
 /**
  * :: Experimental ::
- *
  * Class for calculating the
  * [[http://en.wikipedia.org/wiki/Decision_tree_learning#Gini_impurity Gini impurity]]
  * during binary classification.
@@ -31,7 +30,6 @@ object Gini extends Impurity {
 
   /**
    * :: DeveloperApi ::
-   *
    * Gini coefficient calculation
    * @param c0 count of instances with label 0
    * @param c1 count of instances with label 1

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala
index f407796..8eab247 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala
@@ -21,7 +21,6 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
 
 /**
  * :: Experimental ::
- *
  * Trait for calculating information gain.
  */
 @Experimental
@@ -29,7 +28,6 @@ trait Impurity extends Serializable {
 
   /**
    * :: DeveloperApi ::
-   *
    * information calculation for binary classification
    * @param c0 count of instances with label 0
    * @param c1 count of instances with label 1
@@ -40,7 +38,6 @@ trait Impurity extends Serializable {
 
   /**
    * :: DeveloperApi ::
-   *
    * information calculation for regression
    * @param count number of instances
    * @param sum sum of labels

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Variance.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Variance.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Variance.scala
index 2c64644..47d0712 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Variance.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Variance.scala
@@ -21,7 +21,6 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
 
 /**
  * :: Experimental ::
- *
  * Class for calculating variance during regression
  */
 @Experimental
@@ -31,7 +30,6 @@ object Variance extends Impurity {
 
   /**
    * :: DeveloperApi ::
-   *
    * variance calculation
    * @param count number of instances
    * @param sum sum of labels

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index 0f76f4a..bf692ca 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -24,7 +24,6 @@ import org.apache.spark.mllib.linalg.Vector
 
 /**
  * :: Experimental ::
- *
  * Model to store the decision tree parameters
  * @param topNode root node
  * @param algo algorithm type -- classification or regression

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala
index d36b58e..cc8a24c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala
@@ -21,7 +21,6 @@ import org.apache.spark.annotation.DeveloperApi
 
 /**
  * :: DeveloperApi ::
- *
  * Information gain statistics for each split
  * @param gain information gain value
  * @param impurity current node impurity

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
index 3399721..682f213 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
@@ -24,7 +24,6 @@ import org.apache.spark.mllib.linalg.Vector
 
 /**
  * :: DeveloperApi ::
- *
  * Node in a decision tree
  * @param id integer node id
  * @param predict predicted value at the node

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Split.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Split.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Split.scala
index 8bbb343..d7ffd38 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Split.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Split.scala
@@ -22,7 +22,6 @@ import org.apache.spark.mllib.tree.configuration.FeatureType.FeatureType
 
 /**
  * :: DeveloperApi ::
- *
  * Split applied to a feature
  * @param feature feature index
  * @param threshold threshold for continuous feature

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala
index 230c409..45f9548 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/DataValidators.scala
@@ -24,7 +24,6 @@ import org.apache.spark.mllib.regression.LabeledPoint
 
 /**
  * :: DeveloperApi ::
- *
  * A collection of methods used to validate data before applying ML algorithms.
  */
 @DeveloperApi

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/util/KMeansDataGenerator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/KMeansDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/KMeansDataGenerator.scala
index e693d13..6eaebaf 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/KMeansDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/KMeansDataGenerator.scala
@@ -25,7 +25,6 @@ import org.apache.spark.rdd.RDD
 
 /**
  * :: DeveloperApi ::
- *
  * Generate test data for KMeans. This class first chooses k cluster centers
  * from a d-dimensional Gaussian distribution scaled by factor r and then creates a Gaussian
  * cluster with scale 1 around each center.

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
index 140ff92..c8e160d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
@@ -30,7 +30,6 @@ import org.apache.spark.mllib.regression.LabeledPoint
 
 /**
  * :: DeveloperApi ::
- *
  * Generate sample data used for Linear Data. This class generates
  * uniformly random values for every feature and adds Gaussian noise with mean `eps` to the
  * response variable `Y`.

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
index ca06b9a..c82cd8f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
@@ -27,7 +27,6 @@ import org.apache.spark.mllib.linalg.Vectors
 
 /**
  * :: DeveloperApi ::
- *
  * Generate test data for LogisticRegression. This class chooses positive labels
  * with probability `probOne` and scales features for positive examples by `eps`.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
index 3bd86d6..3f413fa 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
@@ -27,29 +27,28 @@ import org.apache.spark.rdd.RDD
 
 /**
  * :: DeveloperApi ::
+ * Generate RDD(s) containing data for Matrix Factorization.
  *
-* Generate RDD(s) containing data for Matrix Factorization.
-*
-* This method samples training entries according to the oversampling factor
-* 'trainSampFact', which is a multiplicative factor of the number of
-* degrees of freedom of the matrix: rank*(m+n-rank).
-*
-* It optionally samples entries for a testing matrix using
-* 'testSampFact', the percentage of the number of training entries
-* to use for testing.
-*
-* This method takes the following inputs:
-*   sparkMaster    (String) The master URL.
-*   outputPath     (String) Directory to save output.
-*   m              (Int) Number of rows in data matrix.
-*   n              (Int) Number of columns in data matrix.
-*   rank           (Int) Underlying rank of data matrix.
-*   trainSampFact  (Double) Oversampling factor.
-*   noise          (Boolean) Whether to add gaussian noise to training data.
-*   sigma          (Double) Standard deviation of added gaussian noise.
-*   test           (Boolean) Whether to create testing RDD.
-*   testSampFact   (Double) Percentage of training data to use as test data.
-*/
+ * This method samples training entries according to the oversampling factor
+ * 'trainSampFact', which is a multiplicative factor of the number of
+ * degrees of freedom of the matrix: rank*(m+n-rank).
+ *
+ * It optionally samples entries for a testing matrix using
+ * 'testSampFact', the percentage of the number of training entries
+ * to use for testing.
+ *
+ * This method takes the following inputs:
+ *   sparkMaster    (String) The master URL.
+ *   outputPath     (String) Directory to save output.
+ *   m              (Int) Number of rows in data matrix.
+ *   n              (Int) Number of columns in data matrix.
+ *   rank           (Int) Underlying rank of data matrix.
+ *   trainSampFact  (Double) Oversampling factor.
+ *   noise          (Boolean) Whether to add gaussian noise to training data.
+ *   sigma          (Double) Standard deviation of added gaussian noise.
+ *   test           (Boolean) Whether to create testing RDD.
+ *   testSampFact   (Double) Percentage of training data to use as test data.
+ */
 @DeveloperApi
 object MFDataGenerator {
   def main(args: Array[String]) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index 7f9804d..ac2360c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -124,7 +124,6 @@ object MLUtils {
 
   /**
    * :: Experimental ::
-   *
    * Load labeled data from a file. The data format used here is
    * <L>, <f1> <f2> ...
    * where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
@@ -146,7 +145,6 @@ object MLUtils {
 
   /**
    * :: Experimental ::
-   *
    * Save labeled data to a file. The data format used here is
    * <L>, <f1> <f2> ...
    * where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.

http://git-wip-us.apache.org/repos/asf/spark/blob/0adc932a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
index 87a6f2a..ba8190b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
@@ -29,7 +29,6 @@ import org.apache.spark.mllib.regression.LabeledPoint
 
 /**
  * :: DeveloperApi ::
- *
  * Generate sample data used for SVM. This class generates uniform random values
  * for the features and adds Gaussian noise with weight 0.1 to generate labels.
  */


[3/3] git commit: SPARK-729: Closures not always serialized at capture time

Posted by pw...@apache.org.
SPARK-729:  Closures not always serialized at capture time

[SPARK-729](https://spark-project.atlassian.net/browse/SPARK-729) concerns when free variables in closure arguments to transformations are captured.  Currently, it is possible for closures to get the environment in which they are serialized (not the environment in which they are created).  There are a few possible approaches to solving this problem and this PR will discuss some of them.  The approach I took has the advantage of being simple, obviously correct, and minimally-invasive, but it preserves something that has been bothering me about Spark's closure handling, so I'd like to discuss an alternative and get some feedback on whether or not it is worth pursuing.

## What I did

The basic approach I took depends on the work I did for #143, and so this PR is based atop that.  Specifically: #143 modifies `ClosureCleaner.clean` to preemptively determine whether or not closures are serializable immediately upon closure cleaning (rather than waiting for an job involving that closure to be scheduled).  Thus non-serializable closure exceptions will be triggered by the line defining the closure rather than triggered where the closure is used.

Since the easiest way to determine whether or not a closure is serializable is to attempt to serialize it, the code in #143 is creating a serialized closure as part of `ClosureCleaner.clean`.  `clean` currently modifies its argument, but the method in `SparkContext` that wraps it to return a value (a reference to the modified-in-place argument).  This branch modifies `ClosureCleaner.clean` so that it returns a value:  if it is cleaning a serializable closure, it returns the result of deserializing its serialized argument; therefore it is returning a closure with an environment captured at cleaning time.  `SparkContext.clean` then returns the result of `ClosureCleaner.clean`, rather than a reference to its modified-in-place argument.

I've added tests for this behavior (777a1bc).  The pull request as it stands, given the changes in #143, is nearly trivial.  There is some overhead from deserializing the closure, but it is minimal and the benefit of obvious operational correctness (vs. a more sophisticated but harder-to-validate transformation in `ClosureCleaner`) seems pretty important.  I think this is a fine way to solve this problem, but it's not perfect.

## What we might want to do

The thing that has been bothering me about Spark's handling of closures is that it seems like we should be able to statically ensure that cleaning and serialization happen exactly once for a given closure.  If we serialize a closure in order to determine whether or not it is serializable, we should be able to hang on to the generated byte buffer and use it instead of re-serializing the closure later.  By replacing closures with instances of a sum type that encodes whether or not a closure has been cleaned or serialized, we could handle clean, to-be-cleaned, and serialized closures separately with case matches.  Here's a somewhat-concrete sketch (taken from my git stash) of what this might look like:

```scala
package org.apache.spark.util

import java.nio.ByteBuffer
import scala.reflect.ClassManifest

sealed abstract class ClosureBox[T] { def func: T }
final case class RawClosure[T](func: T) extends ClosureBox[T] {}
final case class CleanedClosure[T](func: T) extends ClosureBox[T] {}
final case class SerializedClosure[T](func: T, bytebuf: ByteBuffer) extends ClosureBox[T] {}

object ClosureBoxImplicits {
  implicit def closureBoxFromFunc[T <: AnyRef](fun: T) = new RawClosure[T](fun)
}
```

With these types declared, we'd be able to change `ClosureCleaner.clean` to take a `ClosureBox[T=>U]` (possibly generated by implicit conversion) and return a `ClosureBox[T=>U]` (either a `CleanedClosure[T=>U]` or a `SerializedClosure[T=>U]`, depending on whether or not serializability-checking was enabled) instead of a `T=>U`.  A case match could thus short-circuit cleaning or serializing closures that had already been cleaned or serialized (both in `ClosureCleaner` and in the closure serializer).  Cleaned-and-serialized closures would be represented by a boxed tuple of the original closure and a serialized copy (complete with an environment quiesced at transformation time).  Additional implicit conversions could convert from `ClosureBox` instances to the underlying function type where appropriate.  Tracking this sort of state in the type system seems like the right thing to do to me.

### Why we might not want to do that

_It's pretty invasive._  Every function type used by every `RDD` subclass would have to change to reflect that they expected a `ClosureBox[T=>U]` instead of a `T=>U`.  This obscures what's going on and is not a little ugly.  Although I really like the idea of using the type system to enforce the clean-or-serialize once discipline, it might not be worth adding another layer of types (even if we could hide some of the extra boilerplate with judicious application of implicit conversions).

_It statically guarantees a property whose absence is unlikely to cause any serious problems as it stands._  It appears that all closures are currently dynamically cleaned once and it's not obvious that repeated closure-cleaning is likely to be a problem in the future.  Furthermore, serializing closures is relatively cheap, so doing it once to check for serialization and once again to actually ship them across the wire doesn't seem like a big deal.

Taken together, these seem like a high price to pay for statically guaranteeing that closures are operated upon only once.

## Other possibilities

I felt like the serialize-and-deserialize approach was best due to its obvious simplicity.  But it would be possible to do a more sophisticated transformation within `ClosureCleaner.clean`.  It might also be possible for `clean` to modify its argument in a way so that whether or not a given closure had been cleaned would be apparent upon inspection; this would buy us some of the operational benefits of the `ClosureBox` approach but not the static cleanliness.

I'm interested in any feedback or discussion on whether or not the problems with the type-based approach indeed outweigh the advantage, as well as of approaches to this issue and to closure handling in general.

Author: William Benton <wi...@redhat.com>

Closes #189 from willb/spark-729 and squashes the following commits:

f4cafa0 [William Benton] Stylistic changes and cleanups
b3d9c86 [William Benton] Fixed style issues in tests
9b56ce0 [William Benton] Added array-element capture test
97e9d91 [William Benton] Split closure-serializability failure tests
12ef6e3 [William Benton] Skip proactive closure capture for runJob
8ee3ee7 [William Benton] Predictable closure environment capture
12c63a7 [William Benton] Added tests for variable capture in closures
d6e8dd6 [William Benton] Don't check serializability of DStream transforms.
4ecf841 [William Benton] Make proactive serializability checking optional.
d8df3db [William Benton] Adds proactive closure-serializablilty checking
21b4b06 [William Benton] Test cases for SPARK-897.
d5947b3 [William Benton] Ensure assertions in Graph.apply are asserted.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ca3b2bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ca3b2bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ca3b2bc

Branch: refs/heads/branch-1.0
Commit: 8ca3b2bc90a63b23a03f339e390174cd7a672b40
Parents: 0adc932
Author: William Benton <wi...@redhat.com>
Authored: Wed Apr 9 18:56:27 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Wed Apr 9 18:56:27 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 16 ++--
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  6 +-
 .../org/apache/spark/util/ClosureCleaner.scala  | 21 ++++-
 .../scala/org/apache/spark/FailureSuite.scala   | 17 +++-
 .../ProactiveClosureSerializationSuite.scala    | 94 ++++++++++++++++++++
 .../apache/spark/util/ClosureCleanerSuite.scala | 68 ++++++++++++++
 .../org/apache/spark/graphx/GraphSuite.scala    |  2 +-
 .../spark/streaming/dstream/DStream.scala       |  8 +-
 8 files changed, 218 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7630523..545807f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1002,7 +1002,9 @@ class SparkContext(config: SparkConf) extends Logging {
       require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
     }
     val callSite = getCallSite
-    val cleanedFunc = clean(func)
+    // There's no need to check this function for serializability,
+    // since it will be run right away.
+    val cleanedFunc = clean(func, false)
     logInfo("Starting job: " + callSite)
     val start = System.nanoTime
     dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
@@ -1135,14 +1137,18 @@ class SparkContext(config: SparkConf) extends Logging {
   def cancelAllJobs() {
     dagScheduler.cancelAllJobs()
   }
-
+  
   /**
    * Clean a closure to make it ready to serialized and send to tasks
    * (removes unreferenced variables in $outer's, updates REPL variables)
+   *
+   * @param f closure to be cleaned and optionally serialized
+   * @param captureNow whether or not to serialize this closure and capture any free 
+   * variables immediately; defaults to true.  If this is set and f is not serializable, 
+   * it will raise an exception.
    */
-  private[spark] def clean[F <: AnyRef](f: F): F = {
-    ClosureCleaner.clean(f)
-    f
+  private[spark] def clean[F <: AnyRef : ClassTag](f: F, captureNow: Boolean = true): F = {
+    ClosureCleaner.clean(f, captureNow)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3437b2c..e363ea7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -660,14 +660,16 @@ abstract class RDD[T: ClassTag](
    * Applies a function f to all elements of this RDD.
    */
   def foreach(f: T => Unit) {
-    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
+    val cleanF = sc.clean(f)
+    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
   }
 
   /**
    * Applies a function f to each partition of this RDD.
    */
   def foreachPartition(f: Iterator[T] => Unit) {
-    sc.runJob(this, (iter: Iterator[T]) => f(iter))
+    val cleanF = sc.clean(f)
+    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index cdbbc65..e474b1a 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -22,10 +22,14 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
 import scala.collection.mutable.Map
 import scala.collection.mutable.Set
 
+import scala.reflect.ClassTag
+
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
 
 import org.apache.spark.Logging
+import org.apache.spark.SparkEnv
+import org.apache.spark.SparkException
 
 private[spark] object ClosureCleaner extends Logging {
   // Get an ASM class reader for a given class from the JAR that loaded it
@@ -101,7 +105,7 @@ private[spark] object ClosureCleaner extends Logging {
     }
   }
   
-  def clean(func: AnyRef) {
+  def clean[F <: AnyRef : ClassTag](func: F, captureNow: Boolean = true): F = {
     // TODO: cache outerClasses / innerClasses / accessedFields
     val outerClasses = getOuterClasses(func)
     val innerClasses = getInnerClasses(func)
@@ -150,6 +154,21 @@ private[spark] object ClosureCleaner extends Logging {
       field.setAccessible(true)
       field.set(func, outer)
     }
+    
+    if (captureNow) {
+      cloneViaSerializing(func)
+    } else {
+      func
+    }
+  }
+
+  private def cloneViaSerializing[T: ClassTag](func: T): T = {
+    try {
+      val serializer = SparkEnv.get.closureSerializer.newInstance()
+      serializer.deserialize[T](serializer.serialize[T](func))
+    } catch {
+      case ex: Exception => throw new SparkException("Task not serializable: " + ex.toString)
+    }
   }
   
   private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index 12dbebc..4f93004 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -107,7 +107,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
     FailureSuiteState.clear()
   }
 
-  test("failure because task closure is not serializable") {
+  test("failure because closure in final-stage task is not serializable") {
     sc = new SparkContext("local[1,1]", "test")
     val a = new NonSerializable
 
@@ -118,6 +118,13 @@ class FailureSuite extends FunSuite with LocalSparkContext {
     assert(thrown.getClass === classOf[SparkException])
     assert(thrown.getMessage.contains("NotSerializableException"))
 
+    FailureSuiteState.clear()
+  }
+
+  test("failure because closure in early-stage task is not serializable") {
+    sc = new SparkContext("local[1,1]", "test")
+    val a = new NonSerializable
+
     // Non-serializable closure in an earlier stage
     val thrown1 = intercept[SparkException] {
       sc.parallelize(1 to 10, 2).map(x => (x, a)).partitionBy(new HashPartitioner(3)).count()
@@ -125,6 +132,13 @@ class FailureSuite extends FunSuite with LocalSparkContext {
     assert(thrown1.getClass === classOf[SparkException])
     assert(thrown1.getMessage.contains("NotSerializableException"))
 
+    FailureSuiteState.clear()
+  }
+
+  test("failure because closure in foreach task is not serializable") {
+    sc = new SparkContext("local[1,1]", "test")
+    val a = new NonSerializable
+
     // Non-serializable closure in foreach function
     val thrown2 = intercept[SparkException] {
       sc.parallelize(1 to 10, 2).foreach(x => println(a))
@@ -135,5 +149,6 @@ class FailureSuite extends FunSuite with LocalSparkContext {
     FailureSuiteState.clear()
   }
 
+
   // TODO: Need to add tests with shuffle fetch failures.
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
new file mode 100644
index 0000000..7666226
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer;
+
+import java.io.NotSerializableException
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkException
+import org.apache.spark.SharedSparkContext
+
+/* A trivial (but unserializable) container for trivial functions */
+class UnserializableClass {
+  def op[T](x: T) = x.toString
+  
+  def pred[T](x: T) = x.toString.length % 2 == 0
+}
+
+class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContext {
+
+  def fixture = (sc.parallelize(0 until 1000).map(_.toString), new UnserializableClass)
+
+  test("throws expected serialization exceptions on actions") {
+    val (data, uc) = fixture
+      
+    val ex = intercept[SparkException] {
+      data.map(uc.op(_)).count
+    }
+        
+    assert(ex.getMessage.matches(".*Task not serializable.*"))
+  }
+
+  // There is probably a cleaner way to eliminate boilerplate here, but we're
+  // iterating over a map from transformation names to functions that perform that
+  // transformation on a given RDD, creating one test case for each
+  
+  for (transformation <- 
+      Map("map" -> map _, "flatMap" -> flatMap _, "filter" -> filter _, "mapWith" -> mapWith _,
+          "mapPartitions" -> mapPartitions _, "mapPartitionsWithIndex" -> mapPartitionsWithIndex _,
+          "mapPartitionsWithContext" -> mapPartitionsWithContext _, "filterWith" -> filterWith _)) {
+    val (name, xf) = transformation
+    
+    test(s"$name transformations throw proactive serialization exceptions") {
+      val (data, uc) = fixture
+      
+      val ex = intercept[SparkException] {
+        xf(data, uc)
+      }
+
+      assert(ex.getMessage.matches(".*Task not serializable.*"), s"RDD.$name doesn't proactively throw NotSerializableException")
+    }
+  }
+  
+  def map(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.map(y => uc.op(y))
+
+  def mapWith(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.mapWith(x => x.toString)((x,y) => x + uc.op(y))
+    
+  def flatMap(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.flatMap(y=>Seq(uc.op(y)))
+  
+  def filter(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.filter(y=>uc.pred(y))
+  
+  def filterWith(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.filterWith(x => x.toString)((x,y) => uc.pred(y))
+  
+  def mapPartitions(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.mapPartitions(_.map(y => uc.op(y)))
+  
+  def mapPartitionsWithIndex(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.mapPartitionsWithIndex((_, it) => it.map(y => uc.op(y)))
+  
+  def mapPartitionsWithContext(x: RDD[String], uc: UnserializableClass): RDD[String] =
+    x.mapPartitionsWithContext((_, it) => it.map(y => uc.op(y)))
+  
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 439e564..c635da6 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -50,6 +50,27 @@ class ClosureCleanerSuite extends FunSuite {
     val obj = new TestClassWithNesting(1)
     assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
   }
+  
+  test("capturing free variables in closures at RDD definition") {
+    val obj = new TestCaptureVarClass()
+    val (ones, onesPlusZeroes) = obj.run()
+    
+    assert(ones === onesPlusZeroes)
+  }
+
+  test("capturing free variable fields in closures at RDD definition") {
+    val obj = new TestCaptureFieldClass()
+    val (ones, onesPlusZeroes) = obj.run()
+    
+    assert(ones === onesPlusZeroes)
+  }
+  
+  test("capturing arrays in closures at RDD definition") {
+    val obj = new TestCaptureArrayEltClass()
+    val (observed, expected) = obj.run()
+    
+    assert(observed === expected)
+  }
 }
 
 // A non-serializable class we create in closures to make sure that we aren't
@@ -143,3 +164,50 @@ class TestClassWithNesting(val y: Int) extends Serializable {
     }
   }
 }
+
+class TestCaptureFieldClass extends Serializable {
+  class ZeroBox extends Serializable {
+    var zero = 0
+  }
+
+  def run(): (Int, Int) = {
+    val zb = new ZeroBox
+  
+    withSpark(new SparkContext("local", "test")) {sc =>
+      val ones = sc.parallelize(Array(1, 1, 1, 1, 1))
+      val onesPlusZeroes = ones.map(_ + zb.zero)
+
+      zb.zero = 5
+    
+      (ones.reduce(_ + _), onesPlusZeroes.reduce(_ + _))
+    }
+  }
+}
+
+class TestCaptureArrayEltClass extends Serializable {
+  def run(): (Int, Int) = {
+    withSpark(new SparkContext("local", "test")) {sc =>
+      val rdd = sc.parallelize(1 to 10)
+      val data = Array(1, 2, 3)
+      val expected = data(0)
+      val mapped = rdd.map(x => data(0))
+      data(0) = 4
+      (mapped.first, expected)
+    }
+  }
+}
+
+class TestCaptureVarClass extends Serializable {
+  def run(): (Int, Int) = {
+    var zero = 0
+  
+    withSpark(new SparkContext("local", "test")) {sc =>
+      val ones = sc.parallelize(Array(1, 1, 1, 1, 1))
+      val onesPlusZeroes = ones.map(_ + zero)
+
+      zero = 5
+    
+      (ones.reduce(_ + _), onesPlusZeroes.reduce(_ + _))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 28d34dd..c65e366 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -62,7 +62,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       assert( graph.edges.count() === rawEdges.size )
       // Vertices not explicitly provided but referenced by edges should be created automatically
       assert( graph.vertices.count() === 100)
-      graph.triplets.map { et =>
+      graph.triplets.collect.map { et =>
         assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr))
         assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ca3b2bc/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index d043200..4759b62 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -539,7 +539,7 @@ abstract class DStream[T: ClassTag] (
    * on each RDD of 'this' DStream.
    */
   def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
-    transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
+    transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false))
   }
 
   /**
@@ -547,7 +547,7 @@ abstract class DStream[T: ClassTag] (
    * on each RDD of 'this' DStream.
    */
   def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
-    val cleanedF = context.sparkContext.clean(transformFunc)
+    val cleanedF = context.sparkContext.clean(transformFunc, false)
     val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {
       assert(rdds.length == 1)
       cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
@@ -562,7 +562,7 @@ abstract class DStream[T: ClassTag] (
   def transformWith[U: ClassTag, V: ClassTag](
       other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
     ): DStream[V] = {
-    val cleanedF = ssc.sparkContext.clean(transformFunc)
+    val cleanedF = ssc.sparkContext.clean(transformFunc, false)
     transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
   }
 
@@ -573,7 +573,7 @@ abstract class DStream[T: ClassTag] (
   def transformWith[U: ClassTag, V: ClassTag](
       other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
     ): DStream[V] = {
-    val cleanedF = ssc.sparkContext.clean(transformFunc)
+    val cleanedF = ssc.sparkContext.clean(transformFunc, false)
     val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
       assert(rdds.length == 2)
       val rdd1 = rdds(0).asInstanceOf[RDD[T]]