You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by derrickburns <gi...@git.apache.org> on 2014/09/17 01:02:01 UTC

[GitHub] spark pull request: This commit addresses SPARK-3218, SPARK-3219, ...

GitHub user derrickburns opened a pull request:

    https://github.com/apache/spark/pull/2419

    This commit addresses SPARK-3218, SPARK-3219, SPARK-3261, and SPARK-3424...

    ....
    
    This commit introduces a general distance function (PointOps) for the KMeans clusterer. There are no public API changes.  However, the behavior of the clusterer is different on some tests.  Specifically, the handling of empty clusters has changed.  Emply clusters are not filled with random points in this implementation.  The former behavior is undesirable for a number a reasons, not the least of which is that there is no reasonable use for duplicate cluster centers. To accommodate the change in behaviour, the test cases were changed accordingly. This addresses SPARK-3261.
    
    The PointOps trait defines the distance function.  The PointOps trait is more than a simple distance function.  It also defines the types of Points and Centers for the clusterer.  Standard MLLIB Vectors are converted into Points and Centers.  In the case of the FastEuclideanOps implementation of PointOps, the Point and Center types includes vector norm members. In other distance functions such as the Kullback-Leibler distance function, the Point and Center types include different values that speed up the distance calculation in a similar way that caching vector norms speeds up the Euclidean distance function.  This addresses SPARK-3219.
    
    This commit also splits up the clusterer into a number of components which behave (largely) like their predecessors. KMeansParallel implements the K-Means || initialization algorithm.  KMeansRandom implements the K-Means Random initialization algorithm.  MultiKMeans implements the general K-Means algorithm on a given distance function.  Traits for the initializer (KMeansInitializer) and the general algorithm (MultiKMeansClusterer) are provided to highlight the salient interfaces.
    
    The KMeansPlusPlus implementation was sped up by NOT recomputing distances to clusters centers that were present in previous steps.  This DRAMATICALLY improves the performance of the K-Means Parallel run. This addresses SPARK-3424.
    
    The next step is to make different distance functions available through the user-visible API.  This commit does NOT do that.  Rather, the purpose of this commit is to introduce an implementation of a generic K-Means clusterer that uses different distance functions.
    
    The private K-Means constructor which was used by some test Java code was replaced with a Scala constructor that is not Java friendly.  Since the constructor was not user visible, I simply changed the Java test code to use the higher level interface.
    
    This code has been tested (albeit while packaged outside of Spark) and performance measured on data sets of millions of features each with hundreds of dimensions and on tens of thousands of clusters.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/derrickburns/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2419.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2419
    
----
commit 9889af70d9571bf8631b6f89a7f824c1d58cd3a4
Author: Derrick Burns <de...@gmail.com>
Date:   2014-09-16T23:00:46Z

    This commit addresses SPARK-3218, SPARK-3219, SPARK-3261, and SPARK-3424.
    
    This commit introduces a general distance function (PointOps) for the KMeans clusterer. There are no public API changes.  However, the behavior of the clusterer is different on some tests.  Specifically, the handling of empty clusters has changed.  Emply clusters are not filled with random points in this implementation.  The former behavior is undesirable for a number a reasons, not the least of which is that there is no reasonable use for duplicate cluster centers. To accommodate the change in behaviour, the test cases were changed accordingly. This addresses SPARK-3261.
    
    The PointOps trait defines the distance function.  The PointOps trait is more than a simple distance function.  It also defines the types of Points and Centers for the clusterer.  Standard MLLIB Vectors are converted into Points and Centers.  In the case of the FastEuclideanOps implementation of PointOps, the Point and Center types includes vector norm members. In other distance functions such as the Kullback-Leibler distance function, the Point and Center types include different values that speed up the distance calculation in a similar way that caching vector norms speeds up the Euclidean distance function.  This addresses SPARK-3219.
    
    This commit also splits up the clusterer into a number of components which behave (largely) like their predecessors. KMeansParallel implements the K-Means || initialization algorithm.  KMeansRandom implements the K-Means Random initialization algorithm.  MultiKMeans implements the general K-Means algorithm on a given distance function.  Traits for the initializer (KMeansInitializer) and the general algorithm (MultiKMeansClusterer) are provided to highlight the salient interfaces.
    
    The KMeansPlusPlus implementation was sped up by NOT recomputing distances to clusters centers that were present in previous steps.  This DRAMATICALLY improves the performance of the K-Means Parallel run. This addresses SPARK-3424.
    
    The next step is to make different distance functions available through the user-visible API.  This commit does NOT do that.  Rather, the purpose of this commit is to introduce an implementation of a generic K-Means clusterer that uses different distance functions.
    
    The private K-Means constructor which was used by some test Java code was replaced with a Scala constructor that is not Java friendly.  Since the constructor was not user visible, I simply changed the Java test code to use the higher level interface.
    
    This code has been tested (albeit while packaged outside of Spark) and performance measured on data sets of millions of features each with hundreds of dimensions and on tens of thousands of clusters.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57026819
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20880/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57404356
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21077/consoleFull) for   PR 2419 at commit [`ebdc853`](https://github.com/apache/spark/commit/ebdc853de33373cdd92f5401288d89ab6e879741).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17640097
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.base.{Centroid, FP, PointOps, Zero}
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.reflect.ClassTag
    +
    +
    +/**
    + * A K-Means clustering implementation that performs multiple K-means clusterings simultaneously,
    + * returning the one with the lowest cost.
    + *
    + */
    +
    +private[mllib] class MultiKMeans[P <: FP: ClassTag, C <: FP : ClassTag](
    +  pointOps: PointOps[P,C], maxIterations: Int) extends MultiKMeansClusterer[P,C] {
    +
    +  def cluster(data: RDD[P], centers: Array[Array[C]]): (Double, GeneralizedKMeansModel[P, C]) = {
    +    val runs = centers.length
    +    val active = Array.fill(runs)(true)
    +    val costs = Array.fill(runs)(Zero)
    +    var activeRuns = new ArrayBuffer[Int] ++ (0 until runs)
    +    var iteration = 0
    +
    +    /*
    +     * Execute iterations of Lloyd's algorithm until all runs have converged.
    +     */
    +
    +    while (iteration < maxIterations && activeRuns.nonEmpty) {
    +      // remove the empty clusters
    +      log.info("iteration {}", iteration)
    +
    +      val activeCenters = activeRuns.map(r => centers(r)).toArray
    +
    +      if (log.isInfoEnabled) {
    +        for (r <- 0 until activeCenters.length)
    +          log.info("run {} has {} centers", activeRuns(r), activeCenters(r).length)
    +      }
    +
    +      // Find the sum and count of points mapping to each center
    +      val (centroids, runDistortion) = getCentroids(data, activeCenters)
    +
    +      if (log.isInfoEnabled) {
    +        for (run <- activeRuns) log.info("run {} distortion {}", run, runDistortion(run))
    +      }
    +
    +      for (run <- activeRuns) active(run) = false
    +
    +      for (((runIndex: Int, clusterIndex: Int), cn: Centroid) <- centroids) {
    +        val run = activeRuns(runIndex)
    +        if (cn.isEmpty) {
    +          active(run) = true
    +          centers(run)(clusterIndex) = null.asInstanceOf[C]
    +        } else {
    +          val centroid = pointOps.centroidToPoint(cn)
    +          active(run) = active(run) || pointOps.centerMoved(centroid, centers(run)(clusterIndex))
    +          centers(run)(clusterIndex) = pointOps.pointToCenter(centroid)
    +        }
    +      }
    +
    +      // filter out null centers
    +      for (r <- activeRuns) centers(r) = centers(r).filter(_ != null)
    +
    +      // update distortions and print log message if run completed during this iteration
    +      for ((run, runIndex) <- activeRuns.zipWithIndex) {
    +        costs(run) = runDistortion(runIndex)
    +        if (!active(run)) log.info("run {} finished in {} iterations", run, iteration + 1)
    +      }
    +      activeRuns = activeRuns.filter(active(_))
    +      iteration += 1
    +    }
    +
    +    val best = costs.zipWithIndex.min._2
    +    (costs(best), new GeneralizedKMeansModel(pointOps, centers(best)))
    +  }
    +
    +  def getCentroids(
    +    data: RDD[P],
    +    activeCenters: Array[Array[C]])
    +  : (Array[((Int, Int), Centroid)], Array[Double]) = {
    +    val runDistortion = activeCenters.map(_ => data.sparkContext.accumulator(Zero))
    +    val bcActiveCenters = data.sparkContext.broadcast(activeCenters)
    +    val result = data.mapPartitions { points =>
    +        val bcCenters = bcActiveCenters.value
    +        val centers = bcCenters.map {
    --- End diff --
    
    This uses the number of cluster centers instead of k so that the number of cluster centers may vary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55977238
  
    add to whitelist


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57408395
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21078/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17640135
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib
    +
    +import org.apache.spark.mllib.linalg.Vector
    +import breeze.linalg.{Vector => BV}
    +import org.apache.spark.rdd.RDD
    +
    +package object base {
    +
    +  val Zero = 0.0
    +  val One = 1.0
    +  val Infinity = Double.MaxValue
    +  val Unknown = -1.0
    +
    +  private[mllib] trait FP extends Serializable {
    +    val weight: Double
    +    val raw: BV[Double]
    +  }
    +
    +  private[mllib] class FPoint(val raw: BV[Double], val weight: Double) extends FP {
    +    override def toString: String = weight + "," + (raw.toArray mkString ",")
    +    lazy val inh = (raw :*  (1.0 / weight)).toArray
    +  }
    +
    +  /**
    +   * A mutable point in homogeneous coordinates
    +   */
    +  private[mllib] class Centroid extends Serializable {
    +    override def toString: String = weight + "," + (raw.toArray mkString ",")
    --- End diff --
    
    We initialize the Centroid lazily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56127880
  
    I don't understand the test failure. Can someone help me? 
    
    Sent from my iPhone
    
    > On Sep 16, 2014, at 6:59 PM, Nicholas Chammas <no...@github.com> wrote:
    > 
    > cc @mengxr
    > 
    > —
    > Reply to this email directly or view it on GitHub.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57404045
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21076/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56242004
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20588/consoleFull) for   PR 2419 at commit [`d6c33e8`](https://github.com/apache/spark/commit/d6c33e8756416e6515588724c06230c6fa5069bb).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57020081
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20877/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57028335
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20879/consoleFull) for   PR 2419 at commit [`6ccc8aa`](https://github.com/apache/spark/commit/6ccc8aae4826334206761217457492639276ddc5).
     * This patch **fails** unit tests.
     * This patch **does not** merge cleanly!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17640037
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.base.{Centroid, FP, PointOps, Zero}
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.reflect.ClassTag
    +
    +
    +/**
    + * A K-Means clustering implementation that performs multiple K-means clusterings simultaneously,
    + * returning the one with the lowest cost.
    + *
    + */
    +
    +private[mllib] class MultiKMeans[P <: FP: ClassTag, C <: FP : ClassTag](
    +  pointOps: PointOps[P,C], maxIterations: Int) extends MultiKMeansClusterer[P,C] {
    +
    +  def cluster(data: RDD[P], centers: Array[Array[C]]): (Double, GeneralizedKMeansModel[P, C]) = {
    +    val runs = centers.length
    +    val active = Array.fill(runs)(true)
    +    val costs = Array.fill(runs)(Zero)
    +    var activeRuns = new ArrayBuffer[Int] ++ (0 until runs)
    +    var iteration = 0
    +
    +    /*
    +     * Execute iterations of Lloyd's algorithm until all runs have converged.
    +     */
    +
    +    while (iteration < maxIterations && activeRuns.nonEmpty) {
    +      // remove the empty clusters
    +      log.info("iteration {}", iteration)
    +
    +      val activeCenters = activeRuns.map(r => centers(r)).toArray
    +
    +      if (log.isInfoEnabled) {
    +        for (r <- 0 until activeCenters.length)
    +          log.info("run {} has {} centers", activeRuns(r), activeCenters(r).length)
    +      }
    +
    +      // Find the sum and count of points mapping to each center
    +      val (centroids, runDistortion) = getCentroids(data, activeCenters)
    +
    +      if (log.isInfoEnabled) {
    +        for (run <- activeRuns) log.info("run {} distortion {}", run, runDistortion(run))
    +      }
    +
    +      for (run <- activeRuns) active(run) = false
    +
    +      for (((runIndex: Int, clusterIndex: Int), cn: Centroid) <- centroids) {
    +        val run = activeRuns(runIndex)
    +        if (cn.isEmpty) {
    +          active(run) = true
    +          centers(run)(clusterIndex) = null.asInstanceOf[C]
    +        } else {
    +          val centroid = pointOps.centroidToPoint(cn)
    +          active(run) = active(run) || pointOps.centerMoved(centroid, centers(run)(clusterIndex))
    --- End diff --
    
    Notice that we only compute if a center has moved is we need to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57404291
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21077/consoleFull) for   PR 2419 at commit [`ebdc853`](https://github.com/apache/spark/commit/ebdc853de33373cdd92f5401288d89ab6e879741).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17639973
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansRandom.scala ---
    @@ -0,0 +1,40 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import org.apache.spark.mllib.base.{FP, PointOps}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.random.XORShiftRandom
    +
    +import scala.reflect.ClassTag
    +
    +private[mllib] class KMeansRandom[P <: FP : ClassTag, C <: FP : ClassTag](
    +  pointOps: PointOps[P,C],
    +  k: Int,
    +  runs: Int)
    +  extends KMeansInitializer[P,C] {
    +
    +  def init(data: RDD[P], seed: Int): Array[Array[C]] = {
    +    // Sample all the cluster centers in one pass to avoid repeated scans
    +    val x = data.takeSample(withReplacement=true, runs * k,
    +      new XORShiftRandom().nextInt()).withFilter( x => x.weight > 0)
    --- End diff --
    
    Checks that no zero weight points are allowed to be cluster centers.  This is important for some distance functions that require points with non-zero weights.  When all weights are one (as is the case with the Euclidean distance function) this filter has no effect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55984228
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20512/consoleFull) for   PR 2419 at commit [`a1f6eb8`](https://github.com/apache/spark/commit/a1f6eb86986324936d6b1f52d35a798e50f0c16d).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55898846
  
    I agree that some of my comments should go in the code. 
    
    As for the Big Bang change, I understand your concern. The distance functions touches practically everything. The change in the treatment of number of clusters is also a broad change. So, while I would prefer to make small increment changes, these two changes required touching lots of code.
    
    
    
    Sent from my iPhone
    
    > On Sep 17, 2014, at 12:33 AM, Sean Owen <no...@github.com> wrote:
    > 
    > @derrickburns I think these notes can go in code comments? (They each generate their own email too.)
    > 
    > This is also a big-bang change covering several issues, some of which seem like more focused bug fixes or improvements. I would think it would be easier to break this down further if possible, and get in clear easy changes first.
    > 
    > —
    > Reply to this email directly or view it on GitHub.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55972212
  
    @mengxr per your request, here is a pull request that addresses many of the outstanding issues with the 1.1.0 Spark K-Means clusterer.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56236311
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20588/consoleFull) for   PR 2419 at commit [`d6c33e8`](https://github.com/apache/spark/commit/d6c33e8756416e6515588724c06230c6fa5069bb).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55968915
  
    @srowen, I moved most of the line comments into code comments that I have committed.  Thx!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57691032
  
    Uh oh.
    
    Sent from my iPhone
    
    > On Oct 2, 2014, at 11:35 AM, Nicholas Chammas <no...@github.com> wrote:
    > 
    > @derrickburns Side note: It looks like a lot of extraneous commits got pulled into this PR by mistake. Did you rebase your PR correctly? As it stands, this PR does not merge cleanly.
    > 
    > —
    > Reply to this email directly or view it on GitHub.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by nchammas <gi...@git.apache.org>.
Github user nchammas commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55982927
  
    Last build had a [problem fetching from GitHub](https://amplab.cs.berkeley.edu/jenkins/view/Pull%20Request%20Builders/job/SparkPullRequestBuilder/20502/console).
    
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57404998
  
    @mengxr 
    1. I fixed the merge issue and also remerged to capture more recent changes.
    2. I did as you suggested and introduced a local variable to hold a value used in the closure.  *Unfortunately, the test still fails.  Since all values used in the closure are local variables, I do not know what to do at this point.*
    3. The best fix to the problem with the test that selects two random centers is the one that I implemented.  It will pass the test deterministically, while still using any random seed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by nchammas <gi...@git.apache.org>.
Github user nchammas commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56248490
  
    FYI, I noticed this towards the beginning of the build output:
    
    > fatal: ambiguous argument 'mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala': unknown revision or path not in the working tree.
    Use '--' to separate paths from revisions
    
    Dunno if it has any relevance, but it seems like it might be related to files added in this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17640161
  
    --- Diff: mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java ---
    @@ -76,15 +76,12 @@ public void runKMeansUsingConstructor() {
         Vector expectedCenter = Vectors.dense(1.0, 3.0, 4.0);
     
         JavaRDD<Vector> data = sc.parallelize(points, 2);
    -    KMeansModel model = new KMeans().setK(1).setMaxIterations(5).run(data.rdd());
    --- End diff --
    
    Since the old KMeans constructor was private, I saw no reason to keep it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57403972
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21076/consoleFull) for   PR 2419 at commit [`48a31dd`](https://github.com/apache/spark/commit/48a31dd36e1defa7c4279548579bc1874820a5ae).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55992851
  
    Jenkins, retest this please.
    
    On Wed, Sep 17, 2014 at 9:08 PM, Derrick Burns <de...@gmail.com>
    wrote:
    
    > Tests fixed.  Please re-run.
    >
    > On Wed, Sep 17, 2014 at 7:04 PM, Apache Spark QA <notifications@github.com
    > > wrote:
    >
    >> QA tests have finished
    >> <https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20512/consoleFull>
    >> for PR 2419 at commit a1f6eb8
    >> <https://github.com/apache/spark/commit/a1f6eb86986324936d6b1f52d35a798e50f0c16d>
    >> .
    >>
    >>    - This patch *fails* unit tests.
    >>    - This patch merges cleanly.
    >>    - This patch adds no public classes.
    >>
    >> —
    >> Reply to this email directly or view it on GitHub
    >> <https://github.com/apache/spark/pull/2419#issuecomment-55986521>.
    >>
    >
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by nchammas <gi...@git.apache.org>.
Github user nchammas commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57679766
  
    @derrickburns Side note: It looks like a lot of [extraneous commits](https://github.com/apache/spark/pull/2419#commits-pushed-471d5f2) got pulled into this PR by mistake. Did you rebase your PR correctly? As it stands, this PR does not merge cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17640478
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.base.{Centroid, FP, PointOps, Zero}
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.reflect.ClassTag
    +
    +
    +/**
    + * A K-Means clustering implementation that performs multiple K-means clusterings simultaneously,
    + * returning the one with the lowest cost.
    + *
    + */
    +
    +private[mllib] class MultiKMeans[P <: FP: ClassTag, C <: FP : ClassTag](
    +  pointOps: PointOps[P,C], maxIterations: Int) extends MultiKMeansClusterer[P,C] {
    --- End diff --
    
    pointOps abstracts the distance function.  Notice how it is used herein.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57028017
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20878/consoleFull) for   PR 2419 at commit [`209b034`](https://github.com/apache/spark/commit/209b0341b623f38bcf2c900aafbf6a5574779b4d).
     * This patch **fails** unit tests.
     * This patch **does not** merge cleanly!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by nchammas <gi...@git.apache.org>.
Github user nchammas commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55839359
  
    cc @mengxr


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57408391
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21078/consoleFull) for   PR 2419 at commit [`d6f7c66`](https://github.com/apache/spark/commit/d6f7c66dae95a0b6967637d1132bb7327ab41d8c).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56792727
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20789/consoleFull) for   PR 2419 at commit [`d6c33e8`](https://github.com/apache/spark/commit/d6c33e8756416e6515588724c06230c6fa5069bb).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56787448
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20789/consoleFull) for   PR 2419 at commit [`d6c33e8`](https://github.com/apache/spark/commit/d6c33e8756416e6515588724c06230c6fa5069bb).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57023670
  
    @mengxr  I ran the tests according to your instructions.  One issue remains that I cannot resolve. *I need help with that issue*.
    
    ## Issues
    
    First, the `org.apache.spark.mllib.clustering.KMeansClusterSuite` "task size should be small in both training and prediction" fails, suggesting that the RDD data is being captured in a closure.  This is quite puzzling. My efforts to solve this problem have failed. I *need help* to solve this problem.
    
    Second, the` org.apache.spark.mllib.clustering.KMeansSuite` "two clusters" unit test fails *intermittently* due to randomization. This is due to a bug that exists in the current 1.1 release. This bug is NOT exacerbated by my pull request. Nevertheless, I have modified my pull request to address this bug.  An explanation of the bug is below.
    
    ## Detail
    
    ### The data capture issue
    
    Please look at line 139 of `org.apache.spark.mllib.clustering.MultiKMeans.scala`. To avoid capture of the data in the task closure, I have created an object with a single function.  That object does NOT capture the data.  I then only reference that object in the anonymous function.  I do not understand how this line causes the data to be captured in the closure of the anonymous function. I *need help* to solve this. (I look forward to the acceptance of Scala SIP-21 ("spores") that appears to address cleanly the issue of contaminated closures).
    
    ### The "two clusters" test failure
    
    The random cluster center initialization algorithm uses sampling with replacement. In the test case, there are 6 cluster centers to choose from and 2 are requested.  In the Spark 1.1.0 implementation, cluster centers are chosen randomly *with replacement*. Therefore, we can expect the test to fail 1/6 of the time.   The solution is to change the random initialization to sample without replacement when possible.  I have done is in a commit that I added to this pull request. 
    
    ### Another trivial bug fix
     
    I found a bug in my new code where a toString method (that is only called when an exception is thrown), would throw NPE exception itself.  I fixed the cause of the NPE.  
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57404043
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21076/consoleFull) for   PR 2419 at commit [`48a31dd`](https://github.com/apache/spark/commit/48a31dd36e1defa7c4279548579bc1874820a5ae).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56136714
  
    @derrickburns I cannot see the Jenkins log. Let's call Jenkins again.
    
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17639842
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.mllib.base.{ PointOps, FP, Zero }
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.random.XORShiftRandom
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.reflect.ClassTag
    +
    +private[mllib] class KMeansParallel[P <: FP: ClassTag, C <: FP: ClassTag](
    +  pointOps: PointOps[P, C],
    +  k: Int,
    +  runs: Int,
    +  initializationSteps: Int,
    +  numPartitions: Int)
    +  extends KMeansInitializer[P, C] with Logging {
    +
    +  /**
    +   * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al.
    +   * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries
    +   * to find  dissimilar cluster centers by starting with a random center and then doing
    +   * passes where more centers are chosen with probability proportional to their squared distance
    +   * to the current cluster set. It results in a provable approximation to an optimal clustering.
    +   *
    +   * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
    +   *
    +   * @param data the RDD of points
    +   * @param seed the random number generator seed
    +   * @return
    +   */
    +  def init(data: RDD[P], seed: Int): Array[Array[C]] = {
    +    log.debug("k-means parallel on {} points" + data.count())
    +
    +    // randomly select one center per run, putting each into a separate array buffer
    +    val sample = data.takeSample(true, runs, seed).toSeq.map(pointOps.pointToCenter)
    +    val centers: Array[ArrayBuffer[C]] = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
    +
    +    // add at most 2k points per step
    +    for (step <- 0 until initializationSteps) {
    +      if (log.isInfoEnabled) showCenters(centers, step)
    +      val centerArrays = centers.map { x: ArrayBuffer[C] => x.toArray }
    +      val bcCenters = data.sparkContext.broadcast(centerArrays)
    +      for ((r, p) <- choose(data, seed, step, bcCenters)) {
    +        centers(r) += pointOps.pointToCenter(p)
    +      }
    +      bcCenters.unpersist()
    +    }
    +
    +    val bcCenters = data.sparkContext.broadcast(centers.map(_.toArray))
    +    val result = finalCenters(data, bcCenters, seed)
    +    bcCenters.unpersist()
    +    result
    +  }
    +
    +  def showCenters(centers: Array[ArrayBuffer[C]], step: Int) {
    +    log.info("step {}", step)
    +    for (run <- 0 until runs) {
    +      log.info("final: run {} has {} centers", run, centers.length)
    +    }
    +  }
    +
    +  /**
    +   * Randomly choose at most 2 * k  additional cluster centers by weighting them by their distance
    +   * to the current closest cluster
    +   *
    +   * @param data  the RDD of points
    +   * @param seed  random generator seed
    +   * @param step  which step of the selection process
    +   * @return  array of (run, point)
    +   */
    +  def choose(data: RDD[P], seed: Int, step: Int, bcCenters: Broadcast[Array[Array[C]]])
    +  : Array[(Int, P)] = {
    +    // compute the weighted distortion for each run
    +    val sumCosts = data.flatMap {
    +      point =>
    +        val centers = bcCenters.value
    +        for (r <- 0 until runs) yield {
    +          (r, point.weight * pointOps.pointCost(centers(r), point))
    +        }
    +    }.reduceByKey(_ + _).collectAsMap()
    +
    +    // choose points in proportion to ratio of weighted cost to weighted distortion
    +    data.mapPartitionsWithIndex {
    +      (index, points: Iterator[P]) =>
    +        val centers = bcCenters.value
    +        val range = 0 until runs
    +        val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
    +        points.flatMap { p =>
    +          range.filter { r =>
    +            rand.nextDouble() < 2.0 * p.weight * pointOps.pointCost(centers(r), p) * k / sumCosts(r)
    +          }.map((_, p))
    +        }
    +    }.collect()
    +  }
    +
    +  /**
    +   * Reduce sets of candidate cluster centers to at most k points per set using KMeansPlusPlus.
    +   * Weight the points by the distance to the closest cluster center.
    +   *
    +   * @param data  original points
    +   * @param bcCenters  array of sets of candidate centers
    +   * @param seed  random number seed
    +   * @return  array of sets of cluster centers
    +   */
    +  def finalCenters(data: RDD[P], bcCenters: Broadcast[Array[Array[C]]], seed: Int)
    +  : Array[Array[C]] = {
    +    // for each (run, cluster) compute the sum of the weights of the points in the cluster
    +    val weightMap = data.flatMap {
    +      point =>
    +        val centers = bcCenters.value
    +        for (r <- 0 until runs) yield {
    +          ((r, pointOps.findClosest(centers(r), point)._1), point.weight)
    +        }
    +    }.reduceByKey(_ + _).collectAsMap()
    +
    +    val centers = bcCenters.value
    +    val kmeansPlusPlus = new KMeansPlusPlus(pointOps)
    +    val trackingKmeans = new MultiKMeans(pointOps, 30)
    +    val finalCenters = (0 until runs).map {
    +      r =>
    +        val myCenters = centers(r).toArray
    +        log.info("run {} has {} centers", r, myCenters.length)
    +        val weights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), Zero)).toArray
    +        val kx = if (k > myCenters.length) myCenters.length else k
    +        val sc = data.sparkContext
    +        val initial = kmeansPlusPlus.getCenters(sc, seed, myCenters, weights, kx, numPartitions, 1)
    +        trackingKmeans.cluster(data.sparkContext.parallelize(myCenters.map(pointOps.centerToPoint)),
    --- End diff --
    
    Uses the general k-means algorithm here on the sample set instead of a "LocalKMeans" implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57404357
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21077/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55986521
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20512/consoleFull) for   PR 2419 at commit [`a1f6eb8`](https://github.com/apache/spark/commit/a1f6eb86986324936d6b1f52d35a798e50f0c16d).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17640107
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala ---
    @@ -0,0 +1,28 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.base.FP
    +import org.apache.spark.rdd.RDD
    +
    +
    +private[mllib] trait MultiKMeansClusterer[P <: FP, C <: FP] extends Serializable with Logging {
    --- End diff --
    
    A future pull request will have a different implementation of the this trait.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17639860
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.mllib.base.{ PointOps, FP, Zero }
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.random.XORShiftRandom
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.reflect.ClassTag
    +
    +private[mllib] class KMeansParallel[P <: FP: ClassTag, C <: FP: ClassTag](
    +  pointOps: PointOps[P, C],
    +  k: Int,
    +  runs: Int,
    +  initializationSteps: Int,
    +  numPartitions: Int)
    +  extends KMeansInitializer[P, C] with Logging {
    +
    +  /**
    +   * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al.
    +   * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries
    +   * to find  dissimilar cluster centers by starting with a random center and then doing
    +   * passes where more centers are chosen with probability proportional to their squared distance
    +   * to the current cluster set. It results in a provable approximation to an optimal clustering.
    +   *
    +   * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
    +   *
    +   * @param data the RDD of points
    +   * @param seed the random number generator seed
    +   * @return
    +   */
    +  def init(data: RDD[P], seed: Int): Array[Array[C]] = {
    +    log.debug("k-means parallel on {} points" + data.count())
    +
    +    // randomly select one center per run, putting each into a separate array buffer
    +    val sample = data.takeSample(true, runs, seed).toSeq.map(pointOps.pointToCenter)
    +    val centers: Array[ArrayBuffer[C]] = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
    +
    +    // add at most 2k points per step
    +    for (step <- 0 until initializationSteps) {
    +      if (log.isInfoEnabled) showCenters(centers, step)
    +      val centerArrays = centers.map { x: ArrayBuffer[C] => x.toArray }
    +      val bcCenters = data.sparkContext.broadcast(centerArrays)
    +      for ((r, p) <- choose(data, seed, step, bcCenters)) {
    +        centers(r) += pointOps.pointToCenter(p)
    +      }
    +      bcCenters.unpersist()
    +    }
    +
    +    val bcCenters = data.sparkContext.broadcast(centers.map(_.toArray))
    +    val result = finalCenters(data, bcCenters, seed)
    +    bcCenters.unpersist()
    +    result
    +  }
    +
    +  def showCenters(centers: Array[ArrayBuffer[C]], step: Int) {
    +    log.info("step {}", step)
    +    for (run <- 0 until runs) {
    +      log.info("final: run {} has {} centers", run, centers.length)
    +    }
    +  }
    +
    +  /**
    +   * Randomly choose at most 2 * k  additional cluster centers by weighting them by their distance
    +   * to the current closest cluster
    +   *
    +   * @param data  the RDD of points
    +   * @param seed  random generator seed
    +   * @param step  which step of the selection process
    +   * @return  array of (run, point)
    +   */
    +  def choose(data: RDD[P], seed: Int, step: Int, bcCenters: Broadcast[Array[Array[C]]])
    +  : Array[(Int, P)] = {
    +    // compute the weighted distortion for each run
    +    val sumCosts = data.flatMap {
    +      point =>
    +        val centers = bcCenters.value
    +        for (r <- 0 until runs) yield {
    +          (r, point.weight * pointOps.pointCost(centers(r), point))
    +        }
    +    }.reduceByKey(_ + _).collectAsMap()
    +
    +    // choose points in proportion to ratio of weighted cost to weighted distortion
    +    data.mapPartitionsWithIndex {
    +      (index, points: Iterator[P]) =>
    +        val centers = bcCenters.value
    +        val range = 0 until runs
    +        val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
    +        points.flatMap { p =>
    +          range.filter { r =>
    +            rand.nextDouble() < 2.0 * p.weight * pointOps.pointCost(centers(r), p) * k / sumCosts(r)
    +          }.map((_, p))
    +        }
    +    }.collect()
    +  }
    +
    +  /**
    +   * Reduce sets of candidate cluster centers to at most k points per set using KMeansPlusPlus.
    +   * Weight the points by the distance to the closest cluster center.
    +   *
    +   * @param data  original points
    +   * @param bcCenters  array of sets of candidate centers
    +   * @param seed  random number seed
    +   * @return  array of sets of cluster centers
    +   */
    +  def finalCenters(data: RDD[P], bcCenters: Broadcast[Array[Array[C]]], seed: Int)
    +  : Array[Array[C]] = {
    +    // for each (run, cluster) compute the sum of the weights of the points in the cluster
    +    val weightMap = data.flatMap {
    +      point =>
    +        val centers = bcCenters.value
    +        for (r <- 0 until runs) yield {
    +          ((r, pointOps.findClosest(centers(r), point)._1), point.weight)
    +        }
    +    }.reduceByKey(_ + _).collectAsMap()
    +
    +    val centers = bcCenters.value
    +    val kmeansPlusPlus = new KMeansPlusPlus(pointOps)
    +    val trackingKmeans = new MultiKMeans(pointOps, 30)
    +    val finalCenters = (0 until runs).map {
    +      r =>
    +        val myCenters = centers(r).toArray
    +        log.info("run {} has {} centers", r, myCenters.length)
    +        val weights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), Zero)).toArray
    +        val kx = if (k > myCenters.length) myCenters.length else k
    --- End diff --
    
    Checks for degenerate case that k is greater than the number of centers available.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56898486
  
    @derrickburns Some old k-means tests failed. Could you try `sbt/sbt clean assembly` and then `sbt/sbt mllib/test` on your local machine?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17639992
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala ---
    @@ -1,127 +0,0 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one or more
    - * contributor license agreements.  See the NOTICE file distributed with
    - * this work for additional information regarding copyright ownership.
    - * The ASF licenses this file to You under the Apache License, Version 2.0
    - * (the "License"); you may not use this file except in compliance with
    - * the License.  You may obtain a copy of the License at
    - *
    - *    http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.spark.mllib.clustering
    -
    -import scala.util.Random
    -
    -import breeze.linalg.{Vector => BV, DenseVector => BDV, norm => breezeNorm}
    -
    -import org.apache.spark.Logging
    -
    -/**
    - * An utility object to run K-means locally. This is private to the ML package because it's used
    - * in the initialization of KMeans but not meant to be publicly exposed.
    - */
    -private[mllib] object LocalKMeans extends Logging {
    -
    -  /**
    -   * Run K-means++ on the weighted point set `points`. This first does the K-means++
    -   * initialization procedure and then rounds of Lloyd's algorithm.
    -   */
    -  def kMeansPlusPlus(
    -      seed: Int,
    -      points: Array[BreezeVectorWithNorm],
    -      weights: Array[Double],
    -      k: Int,
    -      maxIterations: Int
    -  ): Array[BreezeVectorWithNorm] = {
    -    val rand = new Random(seed)
    -    val dimensions = points(0).vector.length
    -    val centers = new Array[BreezeVectorWithNorm](k)
    -
    -    // Initialize centers by sampling using the k-means++ procedure.
    -    centers(0) = pickWeighted(rand, points, weights).toDense
    -    for (i <- 1 until k) {
    -      // Pick the next center with a probability proportional to cost under current centers
    -      val curCenters = centers.view.take(i)
    -      val sum = points.view.zip(weights).map { case (p, w) =>
    -        w * KMeans.pointCost(curCenters, p)
    -      }.sum
    -      val r = rand.nextDouble() * sum
    -      var cumulativeScore = 0.0
    -      var j = 0
    -      while (j < points.length && cumulativeScore < r) {
    -        cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j))
    -        j += 1
    -      }
    -      if (j == 0) {
    -        logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." +
    -          s" Using duplicate point for center k = $i.")
    -        centers(i) = points(0).toDense
    --- End diff --
    
    This is the problematic line.  Adding duplicate points is bad.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57028340
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20879/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56775648
  
    I know that this may not be the most pressing issue for anyone on Spark, but I would like to complete this work.  I cannot do that without some help. @mengxr


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57021352
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20879/consoleFull) for   PR 2419 at commit [`6ccc8aa`](https://github.com/apache/spark/commit/6ccc8aae4826334206761217457492639276ddc5).
     * This patch **does not** merge cleanly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57012356
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20871/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57233504
  
    @derrickburns Could you merge the PR cleanly? For the closure serialization, just go with the spores style, assign a class member using to a val and reference the val in the closure. Creating a new class with a single object is not necessary.
    
    For randomization, we should try to make the algorithm deterministic given a random seed. Then we can fix the random seed for tests. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56235934
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56285643
  
    I deleted that file in my original pull request. —
    Sent from Mailbox
    
    On Fri, Sep 19, 2014 at 4:32 PM, Nicholas Chammas
    <no...@github.com> wrote:
    
    > FYI, I noticed this towards the beginning of the build output:
    >> fatal: ambiguous argument 'mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala': unknown revision or path not in the working tree.
    > Use '--' to separate paths from revisions
    > Dunno if it has any relevance, but it seems like it might be related to files added in this patch.
    > ---
    > Reply to this email directly or view it on GitHub:
    > https://github.com/apache/spark/pull/2419#issuecomment-56248490


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17640379
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib
    +
    +import org.apache.spark.mllib.linalg.Vector
    +import breeze.linalg.{Vector => BV}
    +import org.apache.spark.rdd.RDD
    +
    +package object base {
    +
    +  val Zero = 0.0
    +  val One = 1.0
    +  val Infinity = Double.MaxValue
    +  val Unknown = -1.0
    +
    +  private[mllib] trait FP extends Serializable {
    +    val weight: Double
    +    val raw: BV[Double]
    +  }
    +
    +  private[mllib] class FPoint(val raw: BV[Double], val weight: Double) extends FP {
    +    override def toString: String = weight + "," + (raw.toArray mkString ",")
    +    lazy val inh = (raw :*  (1.0 / weight)).toArray
    +  }
    +
    +  /**
    +   * A mutable point in homogeneous coordinates
    +   */
    +  private[mllib] class Centroid extends Serializable {
    +    override def toString: String = weight + "," + (raw.toArray mkString ",")
    +
    +    def isEmpty = weight == Zero
    +
    +    var raw: BV[Double] = null
    +
    +    var weight: Double = Zero
    +
    +    def add(p: Centroid): this.type = add(p.raw, p.weight)
    +
    +    def add(p: FP): this.type = add(p.raw, p.weight)
    +
    +    def sub(p: Centroid): this.type = sub(p.raw, p.weight)
    +
    +    def sub(p: FP): this.type = sub(p.raw, p.weight)
    +
    +    def sub(r: BV[Double], w: Double): this.type = {
    +      if (r != null) {
    +        if (raw == null) {
    +          raw = r.toVector :*= -1.0
    +          weight = w * -1
    +        } else {
    +          raw -= r
    +          weight = weight - w
    +        }
    +      }
    +      this
    +    }
    +
    +    def add(r: BV[Double], w: Double) : this.type = {
    +      if (r != null) {
    +        if (raw == null) {
    +          raw = r.toVector
    +          weight = w
    +        } else {
    +          raw += r
    +          weight = weight + w
    +        }
    +      }
    +      this
    +    }
    +  }
    +
    +  private[mllib] trait PointOps[P <: FP, C <: FP] {
    --- End diff --
    
    This is the distance function trait. For now, only two implementations are provided.  A future pull request will provide others.  I have implemented the Kullback-Leibler distance function successfully with this interface.  I see no reason why any other distance function cannot be implemented efficiently with this interface, since it provides the opportunity to cache data per point and center.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns closed the pull request at:

    https://github.com/apache/spark/pull/2419


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56422112
  
    I still need some help with determining why some test fail.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56907546
  
    @mengxr I will do as you suggest.
    
    On Thu, Sep 25, 2014 at 4:15 PM, Xiangrui Meng <no...@github.com>
    wrote:
    
    > @derrickburns <https://github.com/derrickburns> Some old k-means tests
    > failed. Could you try sbt/sbt clean assembly and then sbt/sbt mllib/test
    > on your local machine?
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/2419#issuecomment-56898486>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57404968
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21078/consoleFull) for   PR 2419 at commit [`d6f7c66`](https://github.com/apache/spark/commit/d6f7c66dae95a0b6967637d1132bb7327ab41d8c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57019942
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20877/consoleFull) for   PR 2419 at commit [`481e5ec`](https://github.com/apache/spark/commit/481e5ec48fad87918f9e2a84871cf37b6af79b5d).
     * This patch **does not** merge cleanly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17640049
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.base.{Centroid, FP, PointOps, Zero}
    +import org.apache.spark.rdd.RDD
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.reflect.ClassTag
    +
    +
    +/**
    + * A K-Means clustering implementation that performs multiple K-means clusterings simultaneously,
    + * returning the one with the lowest cost.
    + *
    + */
    +
    +private[mllib] class MultiKMeans[P <: FP: ClassTag, C <: FP : ClassTag](
    +  pointOps: PointOps[P,C], maxIterations: Int) extends MultiKMeansClusterer[P,C] {
    +
    +  def cluster(data: RDD[P], centers: Array[Array[C]]): (Double, GeneralizedKMeansModel[P, C]) = {
    +    val runs = centers.length
    +    val active = Array.fill(runs)(true)
    +    val costs = Array.fill(runs)(Zero)
    +    var activeRuns = new ArrayBuffer[Int] ++ (0 until runs)
    +    var iteration = 0
    +
    +    /*
    +     * Execute iterations of Lloyd's algorithm until all runs have converged.
    +     */
    +
    +    while (iteration < maxIterations && activeRuns.nonEmpty) {
    +      // remove the empty clusters
    +      log.info("iteration {}", iteration)
    +
    +      val activeCenters = activeRuns.map(r => centers(r)).toArray
    +
    +      if (log.isInfoEnabled) {
    +        for (r <- 0 until activeCenters.length)
    +          log.info("run {} has {} centers", activeRuns(r), activeCenters(r).length)
    +      }
    +
    +      // Find the sum and count of points mapping to each center
    +      val (centroids, runDistortion) = getCentroids(data, activeCenters)
    +
    +      if (log.isInfoEnabled) {
    +        for (run <- activeRuns) log.info("run {} distortion {}", run, runDistortion(run))
    +      }
    +
    +      for (run <- activeRuns) active(run) = false
    +
    +      for (((runIndex: Int, clusterIndex: Int), cn: Centroid) <- centroids) {
    +        val run = activeRuns(runIndex)
    +        if (cn.isEmpty) {
    +          active(run) = true
    +          centers(run)(clusterIndex) = null.asInstanceOf[C]
    --- End diff --
    
    Use null instead of Option to save space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17640123
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/EuclideanOps.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering.metrics
    +
    +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
    +import org.apache.spark.mllib.base.{Centroid, FPoint, PointOps, Infinity, Zero}
    +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector}
    +
    +
    +/**
    + * Euclidean distance measure
    + */
    +class EuclideanOps extends PointOps[FPoint, FPoint] with Serializable {
    +
    +  type C = FPoint
    +  type P = FPoint
    +
    +  val epsilon = 1e-4
    +
    +  def distance(p: P, c: C, upperBound: Double = Infinity): Double = {
    +    val d = p.inh.zip(c.inh).foldLeft(Zero) {
    +      case (d: Double, (a: Double, b: Double)) => d + (a - b) * (a - b)
    --- End diff --
    
    This is the slow algorithm, shown here simply for clarity.  The FastEuclideanOps class is the faster implementation of the Euclidean distance function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56792730
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20789/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17640296
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala ---
    @@ -87,7 +87,7 @@ class KMeansSuite extends FunSuite with LocalSparkContext {
     
         // Make sure code runs.
         var model = KMeans.train(data, k=3, maxIterations=1)
    -    assert(model.clusterCenters.size === 3)
    +    assert(model.clusterCenters.size === 2)
    --- End diff --
    
    Similarly, this change is also the result of not keeping empty clusters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r18184806
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
    @@ -17,16 +17,13 @@
     
     package org.apache.spark.mllib.clustering
     
    -import scala.collection.mutable.ArrayBuffer
     
    -import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm}
    -
    -import org.apache.spark.annotation.Experimental
    -import org.apache.spark.Logging
    -import org.apache.spark.SparkContext._
    -import org.apache.spark.mllib.linalg.{Vector, Vectors}
    -import org.apache.spark.mllib.util.MLUtils
    +import org.apache.spark.mllib.base.{FP, PointOps}
    +import org.apache.spark.mllib.clustering.metrics.FastEuclideanOps
     import org.apache.spark.rdd.RDD
    +<<<<<<< HEAD
    --- End diff --
    
    not merged cleanly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: This commit addresses SPARK-3218, SPARK-3219, ...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55830264
  
    Thanks @nchammas, I added the Apache license headers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55992818
  
    Tests fixed.  Please re-run.
    
    On Wed, Sep 17, 2014 at 7:04 PM, Apache Spark QA <no...@github.com>
    wrote:
    
    > QA tests have finished
    > <https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20512/consoleFull>
    > for PR 2419 at commit a1f6eb8
    > <https://github.com/apache/spark/commit/a1f6eb86986324936d6b1f52d35a798e50f0c16d>
    > .
    >
    >    - This patch *fails* unit tests.
    >    - This patch merges cleanly.
    >    - This patch adds no public classes.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/2419#issuecomment-55986521>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: This commit addresses SPARK-3218, SPARK-3219, ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55826873
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17639919
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansPlusPlus.scala ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import org.apache.spark.mllib.base.{PointOps, FP, Infinity, One, Zero}
    +import org.apache.spark.util.random.XORShiftRandom
    +import org.apache.spark.{Logging, SparkContext}
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.reflect.ClassTag
    +
    +/**
    + *
    + * The KMeans++ initialization algorithm
    + *
    + * @param pointOps distance function
    + * @tparam P point type
    + * @tparam C center type
    + */
    +private[mllib] class KMeansPlusPlus[P <: FP : ClassTag, C <: FP : ClassTag](
    +  pointOps: PointOps[P, C]) extends Serializable with Logging {
    +
    +  /**
    +   * We will maintain for each point the distance to its closest cluster center.
    +   * Since only one center is added on each iteration, recomputing the closest cluster center
    +   * only requires computing the distance to the new cluster center if
    +   * that distance is less than the closest cluster center.  
    +   */
    +  case class FatPoint(location: P, index: Int, weight: Double, distance: Double)
    +
    +  /**
    +   * K-means++ on the weighted point set `points`. This first does the K-means++
    +   * initialization procedure and then rounds of Lloyd's algorithm.
    +   */
    +
    +  def cluster(
    +               sc: SparkContext,
    +               seed: Int,
    +               points: Array[C],
    +               weights: Array[Double],
    +               k: Int,
    +               maxIterations: Int,
    +               numPartitions: Int): Array[C] = {
    +    val centers: Array[C] = getCenters(sc, seed, points, weights, k, numPartitions, 1)
    +    val pts =  sc.parallelize(points.map(pointOps.centerToPoint))
    +    new MultiKMeans(pointOps, maxIterations).cluster(pts, Array(centers))._2.centers
    +  }
    +
    +  /**
    +   * Select centers in rounds.  On each round, select 'perRound' centers, with probability of
    +   * selection equal to the product of the given weights and distance to the closest cluster center
    +   * of the previous round.
    +   *
    +   * @param sc the Spark context
    +   * @param seed a random number seed
    +   * @param points  the candidate centers
    +   * @param weights  the weights on the candidate centers
    +   * @param k  the total number of centers to select
    +   * @param numPartitions the number of data partitions to use
    +   * @param perRound the number of centers to add per round
    +   * @return   an array of at most k cluster centers
    +   */
    +  def getCenters(sc: SparkContext, seed: Int, points: Array[C], weights: Array[Double], k: Int,
    +                 numPartitions: Int, perRound: Int): Array[C] = {
    +    assert(points.length > 0)
    +    assert(k > 0)
    +    assert(numPartitions > 0)
    +    assert(perRound > 0)
    +
    +    if (points.length < k) log.warn("number of clusters requested {} exceeds number of points {}",
    +      k, points.length)
    +    val centers = new ArrayBuffer[C](k)
    +    val rand = new XORShiftRandom(seed)
    +    centers += points(pickWeighted(rand, weights))
    +    log.info("starting kMeansPlusPlus initialization on {} points", points.length)
    +
    +    var more = true
    +    var fatPoints = initialFatPoints(points, weights)
    +    fatPoints = updateDistances(fatPoints, centers.view.take(1))
    +
    +    while (centers.length < k && more) {
    +      val chosen = choose(fatPoints, seed ^ (centers.length << 24), rand, perRound)
    +      val newCenters = chosen.map(points(_))
    +      fatPoints = updateDistances(fatPoints, newCenters)
    +      log.info("chose {} points", chosen.length)
    +      for (index <- chosen) {
    +        log.info("  center({}) = points({})", centers.length, index)
    +        centers += points(index)
    +      }
    +      more = chosen.nonEmpty
    +    }
    +    val result = centers.take(k)
    +    log.info("completed kMeansPlusPlus initialization with {} centers of {} requested",
    +      result.length, k)
    +    result.toArray
    +  }
    +
    +  /**
    +   * Choose points
    +   *
    +   * @param fatPoints points to choose from
    +   * @param seed  random number seed
    +   * @param rand  random number generator
    +   * @param count number of points to choose
    +   * @return indices of chosen points
    +   */
    +  def choose(fatPoints: Array[FatPoint], seed: Int, rand: XORShiftRandom, count: Int) =
    +    (0 until count).flatMap { x => pickCenter(rand, fatPoints.iterator)}.map { _.index}
    +
    +  /**
    +   * Create initial fat points with weights given and infinite distance to closest cluster center.
    +   * @param points points
    +   * @param weights weights of points
    +   * @return fat points with given weighs and infinite distance to closest cluster center
    +   */
    +  def initialFatPoints(points: Array[C], weights: Array[Double]): Array[FatPoint] =
    +    (0 until points.length).map{ i => FatPoint( pointOps.centerToPoint(points(i)), i, weights(i),
    +      Infinity)}.toArray
    +
    +  /**
    +   * Update the distance of each point to its closest cluster center, given only the given cluster
    +   * centers that were modified.
    +   *
    +   * @param points set of candidate initial cluster centers
    +   * @param center new cluster center
    +   * @return  points with their distance to closest to cluster center updated
    +   */
    +
    +  def updateDistances(points: Array[FatPoint], center: Seq[C]): Array[FatPoint] =
    +    points.map { p =>
    +      var i = 0
    +      val to = center.length
    +      var dist = p.distance
    +      val point = p.location
    +      while (i < to) {
    +        dist = pointOps.distance(point, center(i), dist)
    +        i = i + 1
    +      }
    +      p.copy(distance=dist)
    +    }
    +
    +  /**
    +   * Pick a point at random, weighing the choices by the given weight vector.
    +   * Return -1 if all weights are 0.0
    +   *
    +   * @param rand  random number generator
    +   * @param weights  the weights of the points
    +   * @return the index of the point chosen
    +   */
    +  def pickWeighted(rand: XORShiftRandom, weights: Array[Double]): Int = {
    +    val r = rand.nextDouble() * weights.sum
    +    var i = 0
    +    var curWeight = 0.0
    +    while (i < weights.length && curWeight < r) {
    +      assert(weights(i) >= 0.0)
    +      curWeight += weights(i)
    +      i += 1
    +    }
    +    if (i == 0) throw new IllegalArgumentException("all weights are zero")
    --- End diff --
    
    Checks for illegal weight vector and throws exception instead of returning -1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17639873
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.mllib.base.{ PointOps, FP, Zero }
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.random.XORShiftRandom
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.reflect.ClassTag
    +
    +private[mllib] class KMeansParallel[P <: FP: ClassTag, C <: FP: ClassTag](
    +  pointOps: PointOps[P, C],
    +  k: Int,
    +  runs: Int,
    +  initializationSteps: Int,
    +  numPartitions: Int)
    +  extends KMeansInitializer[P, C] with Logging {
    +
    +  /**
    +   * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al.
    +   * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries
    +   * to find  dissimilar cluster centers by starting with a random center and then doing
    +   * passes where more centers are chosen with probability proportional to their squared distance
    +   * to the current cluster set. It results in a provable approximation to an optimal clustering.
    +   *
    +   * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
    +   *
    +   * @param data the RDD of points
    +   * @param seed the random number generator seed
    +   * @return
    +   */
    +  def init(data: RDD[P], seed: Int): Array[Array[C]] = {
    +    log.debug("k-means parallel on {} points" + data.count())
    +
    +    // randomly select one center per run, putting each into a separate array buffer
    +    val sample = data.takeSample(true, runs, seed).toSeq.map(pointOps.pointToCenter)
    +    val centers: Array[ArrayBuffer[C]] = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
    +
    +    // add at most 2k points per step
    +    for (step <- 0 until initializationSteps) {
    +      if (log.isInfoEnabled) showCenters(centers, step)
    +      val centerArrays = centers.map { x: ArrayBuffer[C] => x.toArray }
    +      val bcCenters = data.sparkContext.broadcast(centerArrays)
    +      for ((r, p) <- choose(data, seed, step, bcCenters)) {
    +        centers(r) += pointOps.pointToCenter(p)
    +      }
    +      bcCenters.unpersist()
    +    }
    +
    +    val bcCenters = data.sparkContext.broadcast(centers.map(_.toArray))
    +    val result = finalCenters(data, bcCenters, seed)
    +    bcCenters.unpersist()
    +    result
    +  }
    +
    +  def showCenters(centers: Array[ArrayBuffer[C]], step: Int) {
    +    log.info("step {}", step)
    +    for (run <- 0 until runs) {
    +      log.info("final: run {} has {} centers", run, centers.length)
    +    }
    +  }
    +
    +  /**
    +   * Randomly choose at most 2 * k  additional cluster centers by weighting them by their distance
    +   * to the current closest cluster
    +   *
    +   * @param data  the RDD of points
    +   * @param seed  random generator seed
    +   * @param step  which step of the selection process
    +   * @return  array of (run, point)
    +   */
    +  def choose(data: RDD[P], seed: Int, step: Int, bcCenters: Broadcast[Array[Array[C]]])
    +  : Array[(Int, P)] = {
    +    // compute the weighted distortion for each run
    +    val sumCosts = data.flatMap {
    +      point =>
    +        val centers = bcCenters.value
    +        for (r <- 0 until runs) yield {
    +          (r, point.weight * pointOps.pointCost(centers(r), point))
    +        }
    +    }.reduceByKey(_ + _).collectAsMap()
    +
    +    // choose points in proportion to ratio of weighted cost to weighted distortion
    +    data.mapPartitionsWithIndex {
    +      (index, points: Iterator[P]) =>
    +        val centers = bcCenters.value
    +        val range = 0 until runs
    +        val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
    +        points.flatMap { p =>
    +          range.filter { r =>
    +            rand.nextDouble() < 2.0 * p.weight * pointOps.pointCost(centers(r), p) * k / sumCosts(r)
    +          }.map((_, p))
    +        }
    +    }.collect()
    +  }
    +
    +  /**
    +   * Reduce sets of candidate cluster centers to at most k points per set using KMeansPlusPlus.
    +   * Weight the points by the distance to the closest cluster center.
    +   *
    +   * @param data  original points
    +   * @param bcCenters  array of sets of candidate centers
    +   * @param seed  random number seed
    +   * @return  array of sets of cluster centers
    +   */
    +  def finalCenters(data: RDD[P], bcCenters: Broadcast[Array[Array[C]]], seed: Int)
    +  : Array[Array[C]] = {
    +    // for each (run, cluster) compute the sum of the weights of the points in the cluster
    +    val weightMap = data.flatMap {
    +      point =>
    +        val centers = bcCenters.value
    +        for (r <- 0 until runs) yield {
    +          ((r, pointOps.findClosest(centers(r), point)._1), point.weight)
    +        }
    +    }.reduceByKey(_ + _).collectAsMap()
    +
    +    val centers = bcCenters.value
    +    val kmeansPlusPlus = new KMeansPlusPlus(pointOps)
    +    val trackingKmeans = new MultiKMeans(pointOps, 30)
    +    val finalCenters = (0 until runs).map {
    +      r =>
    +        val myCenters = centers(r).toArray
    +        log.info("run {} has {} centers", r, myCenters.length)
    +        val weights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), Zero)).toArray
    +        val kx = if (k > myCenters.length) myCenters.length else k
    --- End diff --
    
    Checks for the degenerate case that the number of centers available is less than k.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55995342
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20523/consoleFull) for   PR 2419 at commit [`d6c33e8`](https://github.com/apache/spark/commit/d6c33e8756416e6515588724c06230c6fa5069bb).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57238377
  
    Will do
    
    Sent from my iPhone
    
    > On Sep 29, 2014, at 2:32 PM, Xiangrui Meng <no...@github.com> wrote:
    > 
    > @derrickburns Could you merge the PR cleanly? For the closure serialization, just go with the spores style, assign a class member using to a val and reference the val in the closure. Creating a new class with a single object is not necessary.
    > 
    > For randomization, we should try to make the algorithm deterministic given a random seed. Then we can fix the random seed for tests.
    > 
    > —
    > Reply to this email directly or view it on GitHub.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: This commit addresses SPARK-3218, SPARK-3219, ...

Posted by nchammas <gi...@git.apache.org>.
Github user nchammas commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55829039
  
    Hey @derrickburns, I'm not a Spark project maintainer, but just a heads up, you may want to run `dev/run-tests` locally to make sure this patch passes all the style and license tests. It looks like, at the very least, some of the patched files are now missing Apache license headers. `dev/run-tests` will highlight which ones for you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57696626
  
    I will close this pull request and create another.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55858790
  
    @derrickburns I think these notes can go in code comments? (They each generate their own email too.)
    
    This is also a big-bang change covering several issues, some of which seem like more focused bug fixes or improvements. I would think it would be easier to break this down further if possible, and get in clear easy changes first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55977249
  
    this is ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57028022
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20878/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56787244
  
    @derrickburns Maybe Jenkins didn't clean the working directory. I will test it locally, while asking Jenkins to retry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-56787087
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57020077
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20877/consoleFull) for   PR 2419 at commit [`481e5ec`](https://github.com/apache/spark/commit/481e5ec48fad87918f9e2a84871cf37b6af79b5d).
     * This patch **fails** unit tests.
     * This patch **does not** merge cleanly!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55992881
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20523/consoleFull) for   PR 2419 at commit [`d6c33e8`](https://github.com/apache/spark/commit/d6c33e8756416e6515588724c06230c6fa5069bb).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by nchammas <gi...@git.apache.org>.
Github user nchammas commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55983870
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17640286
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala ---
    @@ -75,7 +75,7 @@ class KMeansSuite extends FunSuite with LocalSparkContext {
     
         // Make sure code runs.
         var model = KMeans.train(data, k=2, maxIterations=1)
    -    assert(model.clusterCenters.size === 2)
    +    assert(model.clusterCenters.size === 1)
    --- End diff --
    
    This change is the result of not keeping empty clusters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-57020801
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20878/consoleFull) for   PR 2419 at commit [`209b034`](https://github.com/apache/spark/commit/209b0341b623f38bcf2c900aafbf6a5574779b4d).
     * This patch **does not** merge cleanly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2419#discussion_r17640018
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala ---
    @@ -1,127 +0,0 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one or more
    - * contributor license agreements.  See the NOTICE file distributed with
    - * this work for additional information regarding copyright ownership.
    - * The ASF licenses this file to You under the Apache License, Version 2.0
    - * (the "License"); you may not use this file except in compliance with
    - * the License.  You may obtain a copy of the License at
    - *
    - *    http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.spark.mllib.clustering
    -
    -import scala.util.Random
    -
    -import breeze.linalg.{Vector => BV, DenseVector => BDV, norm => breezeNorm}
    -
    -import org.apache.spark.Logging
    -
    -/**
    - * An utility object to run K-means locally. This is private to the ML package because it's used
    - * in the initialization of KMeans but not meant to be publicly exposed.
    - */
    -private[mllib] object LocalKMeans extends Logging {
    -
    -  /**
    -   * Run K-means++ on the weighted point set `points`. This first does the K-means++
    -   * initialization procedure and then rounds of Lloyd's algorithm.
    -   */
    -  def kMeansPlusPlus(
    -      seed: Int,
    -      points: Array[BreezeVectorWithNorm],
    -      weights: Array[Double],
    -      k: Int,
    -      maxIterations: Int
    -  ): Array[BreezeVectorWithNorm] = {
    -    val rand = new Random(seed)
    -    val dimensions = points(0).vector.length
    -    val centers = new Array[BreezeVectorWithNorm](k)
    -
    -    // Initialize centers by sampling using the k-means++ procedure.
    -    centers(0) = pickWeighted(rand, points, weights).toDense
    -    for (i <- 1 until k) {
    -      // Pick the next center with a probability proportional to cost under current centers
    -      val curCenters = centers.view.take(i)
    -      val sum = points.view.zip(weights).map { case (p, w) =>
    -        w * KMeans.pointCost(curCenters, p)
    -      }.sum
    -      val r = rand.nextDouble() * sum
    -      var cumulativeScore = 0.0
    -      var j = 0
    -      while (j < points.length && cumulativeScore < r) {
    -        cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j))
    -        j += 1
    -      }
    -      if (j == 0) {
    -        logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." +
    -          s" Using duplicate point for center k = $i.")
    -        centers(i) = points(0).toDense
    -      } else {
    -        centers(i) = points(j - 1).toDense
    -      }
    -    }
    -
    -    // Run up to maxIterations iterations of Lloyd's algorithm
    -    val oldClosest = Array.fill(points.length)(-1)
    -    var iteration = 0
    -    var moved = true
    -    while (moved && iteration < maxIterations) {
    -      moved = false
    -      val counts = Array.fill(k)(0.0)
    --- End diff --
    
    Using k is bad.  We want to use the number of cluster centers, which may be other than k.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55996597
  
    @mengxr, can someone help me to understand the test failure?
    
    The test  "task size should be small in both training and prediction" in
    class ClusterKmeansSuite fails with a NullPointerException.
    I do not understand the failure.
    
    None of the other tests in KmeansSuite.scala appears to fail.
    
    Please advise.
    
    On Wed, Sep 17, 2014 at 10:05 PM, Apache Spark QA <no...@github.com>
    wrote:
    
    > QA tests have finished
    > <https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20523/consoleFull>
    > for PR 2419 at commit d6c33e8
    > <https://github.com/apache/spark/commit/d6c33e8756416e6515588724c06230c6fa5069bb>
    > .
    >
    >    - This patch *fails* unit tests.
    >    - This patch merges cleanly.
    >    - This patch adds no public classes.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/2419#issuecomment-55995342>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3218, SPARK-3219, SPARK-3261, SPARK-342...

Posted by derrickburns <gi...@git.apache.org>.
Github user derrickburns commented on the pull request:

    https://github.com/apache/spark/pull/2419#issuecomment-55971460
  
    To understand and evaluate this pull request, I would suggest that a reviewer do the following:
    1) Look at the `PointOps` trait  and its `FastEuclideanOps` implementation to understand its purpose.
    2) Look at the `MultiKMeans` class that implements the iterations of Lloyd's algorithm.  Confirm that this operates as you would expect. 
    3) Look at the `KMeansRandom` class.  Confirm that it creates a `runs` sets of `k` random cluster centers each.
    4) Look at the `KMeansParallel` class. Confirm that it implements the K Means || algorithm and creates `runs` sets of at most `k` cluster centers. 
    5) Look at the `KmeansPlusPlus` class. Confirm that it implements the K Means ++ algorithm.
    
    If the reviewer is familiar with the K Means, K Means ||,  K Means ++ algorithms, then I suspect that the code can be thoroughly reviewed in a couple of hours.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org