You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/06/05 09:25:12 UTC

spark git commit: [SPARK-20930][ML] Destroy broadcasted centers after computing cost in KMeans

Repository: spark
Updated Branches:
  refs/heads/master 2d39711b0 -> 98b5ccd32


[SPARK-20930][ML] Destroy broadcasted centers after computing cost in KMeans

## What changes were proposed in this pull request?
 Destroy broadcasted centers after computing cost
## How was this patch tested?
existing tests

Author: Zheng RuiFeng <ru...@foxmail.com>

Closes #18152 from zhengruifeng/destroy_kmeans_model.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98b5ccd3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98b5ccd3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98b5ccd3

Branch: refs/heads/master
Commit: 98b5ccd32b909cccc38899efa923ca425b116744
Parents: 2d39711
Author: Zheng RuiFeng <ru...@foxmail.com>
Authored: Mon Jun 5 10:25:09 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Jun 5 10:25:09 2017 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/mllib/clustering/KMeansModel.scala   | 5 ++++-
 .../main/scala/org/apache/spark/mllib/clustering/LDAModel.scala | 4 ++--
 .../org/apache/spark/mllib/optimization/GradientDescent.scala   | 1 +
 3 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/98b5ccd3/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index df2a9c0..3ad08c4 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -85,7 +85,10 @@ class KMeansModel @Since("1.1.0") (@Since("1.0.0") val clusterCenters: Array[Vec
   @Since("0.8.0")
   def computeCost(data: RDD[Vector]): Double = {
     val bcCentersWithNorm = data.context.broadcast(clusterCentersWithNorm)
-    data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum()
+    val cost = data
+      .map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum()
+    bcCentersWithNorm.destroy(blocking = false)
+    cost
   }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/98b5ccd3/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 663f63c..4ab4200 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -320,6 +320,7 @@ class LocalLDAModel private[spark] (
 
         docBound
       }.sum()
+    ElogbetaBc.destroy(blocking = false)
 
     // Bound component for prob(topic-term distributions):
     //   E[log p(beta | eta) - log q(beta | lambda)]
@@ -372,7 +373,6 @@ class LocalLDAModel private[spark] (
    */
   private[spark] def getTopicDistributionMethod(sc: SparkContext): Vector => Vector = {
     val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t)
-    val expElogbetaBc = sc.broadcast(expElogbeta)
     val docConcentrationBrz = this.docConcentration.asBreeze
     val gammaShape = this.gammaShape
     val k = this.k
@@ -383,7 +383,7 @@ class LocalLDAModel private[spark] (
       } else {
         val (gamma, _, _) = OnlineLDAOptimizer.variationalTopicInference(
           termCounts,
-          expElogbetaBc.value,
+          expElogbeta,
           docConcentrationBrz,
           gammaShape,
           k)

http://git-wip-us.apache.org/repos/asf/spark/blob/98b5ccd3/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
index 07a67a9..593cdd6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
@@ -246,6 +246,7 @@ object GradientDescent extends Logging {
             // c: (grad, loss, count)
             (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
           })
+      bcWeights.destroy(blocking = false)
 
       if (miniBatchSize > 0) {
         /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org