You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/06/05 09:25:12 UTC
spark git commit: [SPARK-20930][ML] Destroy broadcasted centers after
computing cost in KMeans
Repository: spark
Updated Branches:
refs/heads/master 2d39711b0 -> 98b5ccd32
[SPARK-20930][ML] Destroy broadcasted centers after computing cost in KMeans
## What changes were proposed in this pull request?
Destroy broadcasted centers after computing cost
## How was this patch tested?
existing tests
Author: Zheng RuiFeng <ru...@foxmail.com>
Closes #18152 from zhengruifeng/destroy_kmeans_model.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98b5ccd3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98b5ccd3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98b5ccd3
Branch: refs/heads/master
Commit: 98b5ccd32b909cccc38899efa923ca425b116744
Parents: 2d39711
Author: Zheng RuiFeng <ru...@foxmail.com>
Authored: Mon Jun 5 10:25:09 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Jun 5 10:25:09 2017 +0100
----------------------------------------------------------------------
.../scala/org/apache/spark/mllib/clustering/KMeansModel.scala | 5 ++++-
.../main/scala/org/apache/spark/mllib/clustering/LDAModel.scala | 4 ++--
.../org/apache/spark/mllib/optimization/GradientDescent.scala | 1 +
3 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/98b5ccd3/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index df2a9c0..3ad08c4 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -85,7 +85,10 @@ class KMeansModel @Since("1.1.0") (@Since("1.0.0") val clusterCenters: Array[Vec
@Since("0.8.0")
def computeCost(data: RDD[Vector]): Double = {
val bcCentersWithNorm = data.context.broadcast(clusterCentersWithNorm)
- data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum()
+ val cost = data
+ .map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum()
+ bcCentersWithNorm.destroy(blocking = false)
+ cost
}
http://git-wip-us.apache.org/repos/asf/spark/blob/98b5ccd3/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 663f63c..4ab4200 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -320,6 +320,7 @@ class LocalLDAModel private[spark] (
docBound
}.sum()
+ ElogbetaBc.destroy(blocking = false)
// Bound component for prob(topic-term distributions):
// E[log p(beta | eta) - log q(beta | lambda)]
@@ -372,7 +373,6 @@ class LocalLDAModel private[spark] (
*/
private[spark] def getTopicDistributionMethod(sc: SparkContext): Vector => Vector = {
val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t)
- val expElogbetaBc = sc.broadcast(expElogbeta)
val docConcentrationBrz = this.docConcentration.asBreeze
val gammaShape = this.gammaShape
val k = this.k
@@ -383,7 +383,7 @@ class LocalLDAModel private[spark] (
} else {
val (gamma, _, _) = OnlineLDAOptimizer.variationalTopicInference(
termCounts,
- expElogbetaBc.value,
+ expElogbeta,
docConcentrationBrz,
gammaShape,
k)
http://git-wip-us.apache.org/repos/asf/spark/blob/98b5ccd3/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
index 07a67a9..593cdd6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
@@ -246,6 +246,7 @@ object GradientDescent extends Logging {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})
+ bcWeights.destroy(blocking = false)
if (miniBatchSize > 0) {
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org