You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by freeman-lab <gi...@git.apache.org> on 2014/10/25 10:12:48 UTC

[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

GitHub user freeman-lab opened a pull request:

    https://github.com/apache/spark/pull/2942

    Streaming KMeans [MLLIB][SPARK-3254]

    This adds a Streaming KMeans algorithm to MLlib. It uses an update rule that generalizes the mini-batch KMeans update to incorporate a decay factor, which allows past data to be forgotten. The decay factor can be specified explicitly, or via a more intuitive "fractional decay" setting, in units of either data points or batches.
    
    The PR includes:
    - StreamingKMeans algorithm with decay factor settings
    - Usage example
    - Additions to documentation clustering page
    - Unit tests of basic behavior and decay behaviors
    
    @tdas @mengxr @rezazadeh

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/freeman-lab/spark streaming-kmeans

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2942.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2942
    
----
commit b93350fce951d47e50fafda5bf066d5b29fe9803
Author: freeman <th...@gmail.com>
Date:   2014-08-28T20:32:05Z

    Streaming KMeans with decay
    
    - Used trainOn and predictOn pattern, similar to
    StreamingLinearAlgorithm
    - Decay factor can be set explicitly, or via fractional decay
    parameters expressed in units of number of batches, or number of points
    - Unit tests for basic functionality and decay settings

commit 9fd9c155e956f274237ecfda69a83576975ad8a0
Author: freeman <th...@gmail.com>
Date:   2014-10-22T05:05:43Z

    Merge remote-tracking branch 'upstream/master' into streaming-kmeans

commit a0fd79017e74d3e4d519f507573e74d453aef0ee
Author: freeman <th...@gmail.com>
Date:   2014-10-25T05:14:33Z

    Merge remote-tracking branch 'upstream/master' into streaming-kmeans

commit b5b5f8d41dab067c0ed5b5b9de88d7613dda84ef
Author: freeman <th...@gmail.com>
Date:   2014-10-25T06:17:56Z

    Add better documentation

commit f33684b2e59593a71c577e48c4ab1356444c84d6
Author: freeman <th...@gmail.com>
Date:   2014-10-25T08:03:35Z

    Add explanation and example to docs

commit 5db7074cab7663cc88feeda8f61212ade48ca9a0
Author: freeman <th...@gmail.com>
Date:   2014-10-25T08:03:51Z

    Example usage for StreamingKMeans

commit 9facbe3ecbc14679b83053fa5f471dd50ab68fbd
Author: freeman <th...@gmail.com>
Date:   2014-10-25T08:04:11Z

    Bug fix

commit ea9877c242b06ba690e6237f095137efc2f76faa
Author: freeman <th...@gmail.com>
Date:   2014-10-25T08:04:24Z

    More documentation

commit 2086bdc56a29f63a1c2143f88303e1296df45260
Author: freeman <th...@gmail.com>
Date:   2014-10-25T08:04:31Z

    Log cluster center updates

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61234517
  
    @mengxr I implemented the new parameterization (and tried to make the docs on it more intuitive), see what you think!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60876198
  
      [Test build #22428 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22428/consoleFull) for   PR 2942 at commit [`9f7aea9`](https://github.com/apache/spark/commit/9f7aea9eac3c64f646d1783909e0e2d155663399).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60875441
  
      [Test build #22426 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22426/consoleFull) for   PR 2942 at commit [`374a706`](https://github.com/apache/spark/commit/374a706dd9e4ab064a2003d9440b91ac930e86c4).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19492141
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    +      // store old count and centroid
    +      val oldCount = counts(newP._1)
    +      val oldCentroid = centers(newP._1).toBreeze
    +      // get new count and centroid
    +      val newCount = newP._2._2
    +      val newCentroid = newP._2._1 / newCount.toDouble
    +      // compute the normalized scale factor that controls forgetting
    +      val decayFactor = units match {
    +        case "batches" =>  newCount / (a * oldCount + newCount)
    +        case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount)
    +      }
    +      // perform the update
    +      val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor
    +      // store the new counts and centers
    +      counts(newP._1) = oldCount + newCount
    +      centers(newP._1) = Vectors.fromBreeze(updatedCentroid)
    +
    +      // display the updated cluster centers
    +      val display = centers(newP._1).size match {
    +        case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...")
    +        case _ => centers(newP._1).toArray.mkString("[", ",", "]")
    +      }
    +      logInfo("Cluster %d updated: %s ".format (newP._1, display))
    +    }
    +    new StreamingKMeansModel(centers, counts)
    +  }
    +
    +}
    +
    +@DeveloperApi
    +class StreamingKMeans(
    +     var k: Int,
    +     var a: Double,
    +     var units: String) extends Logging {
    +
    +  protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null)
    +
    +  def this() = this(2, 1.0, "batches")
    +
    +  /** Set the number of clusters. */
    +  def setK(k: Int): this.type = {
    +    this.k = k
    +    this
    +  }
    +
    +  /** Set the decay factor directly (for forgetful algorithms). */
    +  def setDecayFactor(a: Double): this.type = {
    +    this.a = a
    +    this
    +  }
    +
    +  /** Set the decay units for forgetful algorithms ("batches" or "points"). */
    +  def setUnits(units: String): this.type = {
    +    if (units != "batches" && units != "points") {
    +      throw new IllegalArgumentException("Invalid units for decay: " + units)
    +    }
    +    this.units = units
    +    this
    +  }
    +
    +  /** Set decay fraction in units of batches. */
    +  def setDecayFractionBatches(q: Double): this.type = {
    +    this.a = math.log(1 - q) / math.log(0.5)
    +    this.units = "batches"
    +    this
    +  }
    +
    +  /** Set decay fraction in units of points. Must specify expected number of points per batch. */
    +  def setDecayFractionPoints(q: Double, m: Double): this.type = {
    +    this.a = math.pow(math.log(1 - q) / math.log(0.5), 1/m)
    +    this.units = "points"
    +    this
    +  }
    +
    +  /** Specify initial explicitly directly. */
    +  def setInitialCenters(initialCenters: Array[Vector]): this.type = {
    +    val clusterCounts = Array.fill(this.k)(0).map(_.toLong)
    +    this.model = new StreamingKMeansModel(initialCenters, clusterCounts)
    +    this
    +  }
    +
    +  /** Initialize random centers, requiring only the number of dimensions. */
    +  def setRandomCenters(d: Int): this.type = {
    --- End diff --
    
    We should add `seed` as an argument. This method needs `@param` in the doc or change `d` to `dim`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490486
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    +      // store old count and centroid
    +      val oldCount = counts(newP._1)
    +      val oldCentroid = centers(newP._1).toBreeze
    +      // get new count and centroid
    +      val newCount = newP._2._2
    +      val newCentroid = newP._2._1 / newCount.toDouble
    +      // compute the normalized scale factor that controls forgetting
    +      val decayFactor = units match {
    +        case "batches" =>  newCount / (a * oldCount + newCount)
    +        case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount)
    +      }
    +      // perform the update
    +      val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor
    +      // store the new counts and centers
    +      counts(newP._1) = oldCount + newCount
    +      centers(newP._1) = Vectors.fromBreeze(updatedCentroid)
    +
    +      // display the updated cluster centers
    +      val display = centers(newP._1).size match {
    +        case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...")
    +        case _ => centers(newP._1).toArray.mkString("[", ",", "]")
    +      }
    +      logInfo("Cluster %d updated: %s ".format (newP._1, display))
    +    }
    +    new StreamingKMeansModel(centers, counts)
    +  }
    +
    +}
    +
    +@DeveloperApi
    +class StreamingKMeans(
    +     var k: Int,
    +     var a: Double,
    --- End diff --
    
    use a better name, e.g., `decayFactor`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by anantasty <gi...@git.apache.org>.
Github user anantasty commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60554980
  
    Should we create another PR for the python bindings/example?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61241985
  
      [Test build #22607 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22607/consoleFull) for   PR 2942 at commit [`0411bf5`](https://github.com/apache/spark/commit/0411bf563bf2296d3a56b1a60bb5e4e1f2789981).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class StreamingKMeansModel(`
      * `class StreamingKMeans(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61356950
  
      [Test build #22677 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22677/consoleFull) for   PR 2942 at commit [`b2e5b4a`](https://github.com/apache/spark/commit/b2e5b4a167e0e5835f3518d2b68e4063c3f9c955).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490587
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    +      // store old count and centroid
    +      val oldCount = counts(newP._1)
    +      val oldCentroid = centers(newP._1).toBreeze
    +      // get new count and centroid
    +      val newCount = newP._2._2
    +      val newCentroid = newP._2._1 / newCount.toDouble
    +      // compute the normalized scale factor that controls forgetting
    +      val decayFactor = units match {
    +        case "batches" =>  newCount / (a * oldCount + newCount)
    +        case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount)
    +      }
    +      // perform the update
    +      val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor
    +      // store the new counts and centers
    +      counts(newP._1) = oldCount + newCount
    +      centers(newP._1) = Vectors.fromBreeze(updatedCentroid)
    +
    +      // display the updated cluster centers
    +      val display = centers(newP._1).size match {
    +        case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...")
    +        case _ => centers(newP._1).toArray.mkString("[", ",", "]")
    +      }
    +      logInfo("Cluster %d updated: %s ".format (newP._1, display))
    +    }
    +    new StreamingKMeansModel(centers, counts)
    +  }
    +
    +}
    +
    +@DeveloperApi
    +class StreamingKMeans(
    +     var k: Int,
    +     var a: Double,
    +     var units: String) extends Logging {
    +
    +  protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null)
    +
    +  def this() = this(2, 1.0, "batches")
    +
    +  /** Set the number of clusters. */
    +  def setK(k: Int): this.type = {
    +    this.k = k
    +    this
    +  }
    +
    +  /** Set the decay factor directly (for forgetful algorithms). */
    +  def setDecayFactor(a: Double): this.type = {
    +    this.a = a
    +    this
    +  }
    +
    +  /** Set the decay units for forgetful algorithms ("batches" or "points"). */
    +  def setUnits(units: String): this.type = {
    +    if (units != "batches" && units != "points") {
    +      throw new IllegalArgumentException("Invalid units for decay: " + units)
    +    }
    +    this.units = units
    +    this
    +  }
    +
    +  /** Set decay fraction in units of batches. */
    +  def setDecayFractionBatches(q: Double): this.type = {
    +    this.a = math.log(1 - q) / math.log(0.5)
    +    this.units = "batches"
    +    this
    +  }
    +
    +  /** Set decay fraction in units of points. Must specify expected number of points per batch. */
    +  def setDecayFractionPoints(q: Double, m: Double): this.type = {
    --- End diff --
    
    ditto: this overwrites `decayFactor` and `units`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by coderxiang <gi...@git.apache.org>.
Github user coderxiang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19446715
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    +      // store old count and centroid
    +      val oldCount = counts(newP._1)
    +      val oldCentroid = centers(newP._1).toBreeze
    +      // get new count and centroid
    +      val newCount = newP._2._2
    +      val newCentroid = newP._2._1 / newCount.toDouble
    +      // compute the normalized scale factor that controls forgetting
    +      val decayFactor = units match {
    +        case "batches" =>  newCount / (a * oldCount + newCount)
    +        case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount)
    +      }
    +      // perform the update
    +      val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor
    +      // store the new counts and centers
    +      counts(newP._1) = oldCount + newCount
    +      centers(newP._1) = Vectors.fromBreeze(updatedCentroid)
    +
    +      // display the updated cluster centers
    +      val display = centers(newP._1).size match {
    +        case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...")
    +        case _ => centers(newP._1).toArray.mkString("[", ",", "]")
    +      }
    +      logInfo("Cluster %d updated: %s ".format (newP._1, display))
    +    }
    +    new StreamingKMeansModel(centers, counts)
    +  }
    +
    +}
    +
    +@DeveloperApi
    +class StreamingKMeans(
    +     var k: Int,
    --- End diff --
    
    indent of 4 spaces?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490241
  
    --- Diff: docs/mllib-clustering.md ---
    @@ -153,3 +153,75 @@ provided in the [Self-Contained Applications](quick-start.html#self-contained-ap
     section of the Spark
     Quick Start guide. Be sure to also include *spark-mllib* to your build file as
     a dependency.
    +
    +## Streaming clustering
    +
    +When data arrive in a stream, we may want to estimate clusters dynamically, updating them as new data arrive. MLlib provides support for streaming KMeans clustering, with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm uses a generalization of the mini-batch KMeans update rule. For each batch of data, we assign all points to their nearest cluster, compute new cluster centers, then update each cluster using:
    +
    +`\begin{equation}
    +    c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t}
    +\end{equation}`
    +`\begin{equation}
    +    n_{t+1} = n_t + m_t  
    +\end{equation}`
    +
    +Where `$c_t$` is the previous center for the cluster, `$n_t$` is the number of points assigned to the cluster thus far, `$x_t$` is the new cluster center from the current batch, and `$m_t$` is the number of points added to the cluster in the current batch. The decay factor `$\alpha$` can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning; with `$\alpha$=0` only the most recent data will be used. This is analogous to an expontentially-weighted moving average.
    --- End diff --
    
    1. line too wide
    2. `alpha` is a constant, independent of `n_t` and `m_t`. So we treat either `batch` or `point` as a time unit. Using `point` as the time unit is not mentioned here. It is okay to put a link to the generated Scala doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490145
  
    --- Diff: docs/mllib-clustering.md ---
    @@ -153,3 +153,75 @@ provided in the [Self-Contained Applications](quick-start.html#self-contained-ap
     section of the Spark
     Quick Start guide. Be sure to also include *spark-mllib* to your build file as
     a dependency.
    +
    +## Streaming clustering
    +
    +When data arrive in a stream, we may want to estimate clusters dynamically, updating them as new data arrive. MLlib provides support for streaming KMeans clustering, with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm uses a generalization of the mini-batch KMeans update rule. For each batch of data, we assign all points to their nearest cluster, compute new cluster centers, then update each cluster using:
    --- End diff --
    
    1. line too wide
    2. `KMeans` -> `k-means`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60477107
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22209/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60875506
  
      [Test build #22426 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22426/consoleFull) for   PR 2942 at commit [`374a706`](https://github.com/apache/spark/commit/374a706dd9e4ab064a2003d9440b91ac930e86c4).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class StreamingKMeansModel(`
      * `class StreamingKMeans(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61358857
  
    LGTM. Merged into master. Thanks for adding streaming k-means!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60806389
  
    @freeman-lab I made a quick pass over the implementation. It looks great! I will check the math and the test code with someone who knows everything about streaming k-means later today :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490351
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    --- End diff --
    
    remove this empty line to make the real doc show up in the generated doc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490523
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    +      // store old count and centroid
    +      val oldCount = counts(newP._1)
    +      val oldCentroid = centers(newP._1).toBreeze
    +      // get new count and centroid
    +      val newCount = newP._2._2
    +      val newCentroid = newP._2._1 / newCount.toDouble
    +      // compute the normalized scale factor that controls forgetting
    +      val decayFactor = units match {
    +        case "batches" =>  newCount / (a * oldCount + newCount)
    +        case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount)
    +      }
    +      // perform the update
    +      val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor
    +      // store the new counts and centers
    +      counts(newP._1) = oldCount + newCount
    +      centers(newP._1) = Vectors.fromBreeze(updatedCentroid)
    +
    +      // display the updated cluster centers
    +      val display = centers(newP._1).size match {
    +        case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...")
    +        case _ => centers(newP._1).toArray.mkString("[", ",", "]")
    +      }
    +      logInfo("Cluster %d updated: %s ".format (newP._1, display))
    +    }
    +    new StreamingKMeansModel(centers, counts)
    +  }
    +
    +}
    +
    +@DeveloperApi
    +class StreamingKMeans(
    +     var k: Int,
    +     var a: Double,
    +     var units: String) extends Logging {
    --- End diff --
    
    It is not clear from the name that `units` is to control the decay. I don't have good suggestions, maybe `timeUnit`, which takes either `"batch"` or `"point"`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61319592
  
    @freeman-lab I made some changes: https://github.com/freeman-lab/spark/pull/1 , which includes the following:
    
    1. discount on previous counts
    2. detecting dying clusters
    3. use BLAS if possible
    
    If the update looks good to you, could you merge that one? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490470
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    --- End diff --
    
    `{...}` -> `(...)`
    
    `.collectAsMap().toArray` -> `.collect()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490345
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    --- End diff --
    
    move this import group after scala's imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490467
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    --- End diff --
    
    use JavaDoc for methods


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61356758
  
    @mengxr great updates! LGMT. Just need to update the doc/examples in a couple places I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2942


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61354018
  
      [Test build #22673 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22673/consoleFull) for   PR 2942 at commit [`078617c`](https://github.com/apache/spark/commit/078617c167b5b2b698193cddfd567d60637ba906).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490261
  
    --- Diff: docs/mllib-clustering.md ---
    @@ -153,3 +153,75 @@ provided in the [Self-Contained Applications](quick-start.html#self-contained-ap
     section of the Spark
     Quick Start guide. Be sure to also include *spark-mllib* to your build file as
     a dependency.
    +
    +## Streaming clustering
    +
    +When data arrive in a stream, we may want to estimate clusters dynamically, updating them as new data arrive. MLlib provides support for streaming KMeans clustering, with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm uses a generalization of the mini-batch KMeans update rule. For each batch of data, we assign all points to their nearest cluster, compute new cluster centers, then update each cluster using:
    +
    +`\begin{equation}
    +    c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t}
    +\end{equation}`
    +`\begin{equation}
    +    n_{t+1} = n_t + m_t  
    +\end{equation}`
    +
    +Where `$c_t$` is the previous center for the cluster, `$n_t$` is the number of points assigned to the cluster thus far, `$x_t$` is the new cluster center from the current batch, and `$m_t$` is the number of points added to the cluster in the current batch. The decay factor `$\alpha$` can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning; with `$\alpha$=0` only the most recent data will be used. This is analogous to an expontentially-weighted moving average.
    +
    +### Examples
    +
    +This example shows how to estimate clusters on streaming data.
    +
    +<div class="codetabs">
    +
    +<div data-lang="scala" markdown="1">
    +
    +First we import the neccessary classes.
    +
    +{% highlight scala %}
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.mllib.clustering.StreamingKMeans
    +
    +{% endhighlight %}
    +
    +Then we make an input stream of vectors for training, as well as one for testing. We assume a StreamingContext `ssc` has been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. For this example, we use vector data. 
    +
    +{% highlight scala %}
    +
    +val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
    +val testData = ssc.textFileStream("/testing/data/dir").map(Vectors.parse)
    +
    +{% endhighlight %}
    +
    +We create a model with random clusters and specify the number of clusters to find
    +
    +{% highlight scala %}
    +
    +val numDimensions = 3
    +val numClusters = 2
    +val model = new StreamingKMeans()
    +    .setK(numClusters)
    --- End diff --
    
    2-space indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60794676
  
    @anantasty This PR is still in review. If you are interested in Python binding of streaming algorithms. Could you help add one for StreamingLinearRegression? Thanks!
    
    https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490527
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    +      // store old count and centroid
    +      val oldCount = counts(newP._1)
    +      val oldCentroid = centers(newP._1).toBreeze
    +      // get new count and centroid
    +      val newCount = newP._2._2
    +      val newCentroid = newP._2._1 / newCount.toDouble
    +      // compute the normalized scale factor that controls forgetting
    +      val decayFactor = units match {
    +        case "batches" =>  newCount / (a * oldCount + newCount)
    +        case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount)
    +      }
    +      // perform the update
    +      val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor
    +      // store the new counts and centers
    +      counts(newP._1) = oldCount + newCount
    +      centers(newP._1) = Vectors.fromBreeze(updatedCentroid)
    +
    +      // display the updated cluster centers
    +      val display = centers(newP._1).size match {
    +        case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...")
    +        case _ => centers(newP._1).toArray.mkString("[", ",", "]")
    +      }
    +      logInfo("Cluster %d updated: %s ".format (newP._1, display))
    +    }
    +    new StreamingKMeansModel(centers, counts)
    +  }
    +
    +}
    +
    +@DeveloperApi
    +class StreamingKMeans(
    +     var k: Int,
    +     var a: Double,
    +     var units: String) extends Logging {
    +
    +  protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null)
    +
    +  def this() = this(2, 1.0, "batches")
    +
    +  /** Set the number of clusters. */
    +  def setK(k: Int): this.type = {
    +    this.k = k
    +    this
    +  }
    +
    +  /** Set the decay factor directly (for forgetful algorithms). */
    +  def setDecayFactor(a: Double): this.type = {
    +    this.a = a
    +    this
    +  }
    +
    +  /** Set the decay units for forgetful algorithms ("batches" or "points"). */
    +  def setUnits(units: String): this.type = {
    +    if (units != "batches" && units != "points") {
    +      throw new IllegalArgumentException("Invalid units for decay: " + units)
    +    }
    +    this.units = units
    +    this
    +  }
    +
    +  /** Set decay fraction in units of batches. */
    --- End diff --
    
    It is worth noting that this overwrites `decayFactor` and `units`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60796301
  
    @anantasty Agreed, should be separate, but would be very cool to have! Ping me as well, happy to provide feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61358356
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22677/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490476
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    --- End diff --
    
    More readable with pattern matching:
    
    ~~~
    pointStats.foreach { case (pred, (mean, count)) =>
    ...
    }
    ~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490284
  
    --- Diff: docs/mllib-clustering.md ---
    @@ -153,3 +153,75 @@ provided in the [Self-Contained Applications](quick-start.html#self-contained-ap
     section of the Spark
     Quick Start guide. Be sure to also include *spark-mllib* to your build file as
     a dependency.
    +
    +## Streaming clustering
    +
    +When data arrive in a stream, we may want to estimate clusters dynamically, updating them as new data arrive. MLlib provides support for streaming KMeans clustering, with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm uses a generalization of the mini-batch KMeans update rule. For each batch of data, we assign all points to their nearest cluster, compute new cluster centers, then update each cluster using:
    +
    +`\begin{equation}
    +    c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t}
    +\end{equation}`
    +`\begin{equation}
    +    n_{t+1} = n_t + m_t  
    +\end{equation}`
    +
    +Where `$c_t$` is the previous center for the cluster, `$n_t$` is the number of points assigned to the cluster thus far, `$x_t$` is the new cluster center from the current batch, and `$m_t$` is the number of points added to the cluster in the current batch. The decay factor `$\alpha$` can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning; with `$\alpha$=0` only the most recent data will be used. This is analogous to an expontentially-weighted moving average.
    +
    +### Examples
    +
    +This example shows how to estimate clusters on streaming data.
    +
    +<div class="codetabs">
    +
    +<div data-lang="scala" markdown="1">
    +
    +First we import the neccessary classes.
    +
    +{% highlight scala %}
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.mllib.clustering.StreamingKMeans
    +
    +{% endhighlight %}
    +
    +Then we make an input stream of vectors for training, as well as one for testing. We assume a StreamingContext `ssc` has been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. For this example, we use vector data. 
    +
    +{% highlight scala %}
    +
    +val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
    +val testData = ssc.textFileStream("/testing/data/dir").map(Vectors.parse)
    +
    +{% endhighlight %}
    +
    +We create a model with random clusters and specify the number of clusters to find
    +
    +{% highlight scala %}
    +
    +val numDimensions = 3
    +val numClusters = 2
    +val model = new StreamingKMeans()
    +    .setK(numClusters)
    +    .setDecayFactor(1.0)
    +    .setRandomWeights(numDimensions)
    +
    +{% endhighlight %}
    +
    +Now register the streams for training and testing and start the job, printing the predicted cluster assignments on new data points as they arrive.
    +
    +{% highlight scala %}
    +
    +model.trainOn(trainingData)
    +model.predictOn(testData).print()
    --- End diff --
    
    `predictOn` only outputs the prediction, which is not very useful. maybe we should use `predictOnValues` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61356547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22673/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by coderxiang <gi...@git.apache.org>.
Github user coderxiang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19446734
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    +      // store old count and centroid
    +      val oldCount = counts(newP._1)
    +      val oldCentroid = centers(newP._1).toBreeze
    +      // get new count and centroid
    +      val newCount = newP._2._2
    +      val newCentroid = newP._2._1 / newCount.toDouble
    +      // compute the normalized scale factor that controls forgetting
    +      val decayFactor = units match {
    +        case "batches" =>  newCount / (a * oldCount + newCount)
    +        case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount)
    +      }
    +      // perform the update
    +      val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor
    +      // store the new counts and centers
    +      counts(newP._1) = oldCount + newCount
    +      centers(newP._1) = Vectors.fromBreeze(updatedCentroid)
    +
    +      // display the updated cluster centers
    +      val display = centers(newP._1).size match {
    +        case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...")
    +        case _ => centers(newP._1).toArray.mkString("[", ",", "]")
    +      }
    +      logInfo("Cluster %d updated: %s ".format (newP._1, display))
    +    }
    +    new StreamingKMeansModel(centers, counts)
    +  }
    +
    +}
    +
    +@DeveloperApi
    +class StreamingKMeans(
    +     var k: Int,
    +     var a: Double,
    +     var units: String) extends Logging {
    +
    +  protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null)
    +
    +  def this() = this(2, 1.0, "batches")
    +
    +  /** Set the number of clusters. */
    +  def setK(k: Int): this.type = {
    +    this.k = k
    +    this
    +  }
    +
    +  /** Set the decay factor directly (for forgetful algorithms). */
    +  def setDecayFactor(a: Double): this.type = {
    +    this.a = a
    +    this
    +  }
    +
    +  /** Set the decay units for forgetful algorithms ("batches" or "points"). */
    +  def setUnits(units: String): this.type = {
    +    if (units != "batches" && units != "points") {
    +      throw new IllegalArgumentException("Invalid units for decay: " + units)
    +    }
    +    this.units = units
    +    this
    +  }
    +
    +  /** Set decay fraction in units of batches. */
    +  def setDecayFractionBatches(q: Double): this.type = {
    +    this.a = math.log(1 - q) / math.log(0.5)
    +    this.units = "batches"
    +    this
    +  }
    +
    +  /** Set decay fraction in units of points. Must specify expected number of points per batch. */
    +  def setDecayFractionPoints(q: Double, m: Double): this.type = {
    +    this.a = math.pow(math.log(1 - q) / math.log(0.5), 1/m)
    +    this.units = "points"
    +    this
    +  }
    +
    +  /** Specify initial explicitly directly. */
    +  def setInitialCenters(initialCenters: Array[Vector]): this.type = {
    +    val clusterCounts = Array.fill(this.k)(0).map(_.toLong)
    --- End diff --
    
    how about Array.fill[Long](this.k)(0)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490338
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.examples.mllib
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.mllib.clustering.StreamingKMeans
    +import org.apache.spark.SparkConf
    +import org.apache.spark.streaming.{Seconds, StreamingContext}
    +
    +/**
    + * Estimate clusters on one stream of data and make predictions
    + * on another stream, where the data streams arrive as text files
    + * into two different directories.
    + *
    + * The rows of the text files must be vector data in the form
    + * `[x1,x2,x3,...,xn]`
    + * Where n is the number of dimensions. n must be the same for train and test.
    + *
    + * Usage: StreamingKmeans <trainingDir> <testDir> <batchDuration> <numClusters> <numDimensions>
    + *
    + * To run on your local machine using the two directories `trainingDir` and `testDir`,
    + * with updates every 5 seconds, 2 dimensions per data point, and 3 clusters, call:
    + *    $ bin/run-example \
    + *        org.apache.spark.examples.mllib.StreamingKMeans trainingDir testDir 5 3 2
    + *
    + * As you add text files to `trainingDir` the clusters will continuously update.
    + * Anytime you add text files to `testDir`, you'll see predicted labels using the current model.
    + *
    + */
    +object StreamingKMeans {
    +
    +  def main(args: Array[String]) {
    +
    +    if (args.length != 5) {
    +      System.err.println(
    +        "Usage: StreamingKMeans " +
    +          "<trainingDir> <testDir> <batchDuration> <numClusters> <numDimensions>")
    +      System.exit(1)
    +    }
    +
    +    val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
    +    val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
    +
    +    val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
    +    val testData = ssc.textFileStream(args(1)).map(Vectors.parse)
    +
    +    val model = new StreamingKMeans()
    +      .setK(args(3).toInt)
    +      .setDecayFactor(1.0)
    +      .setRandomCenters(args(4).toInt)
    +
    +    model.trainOn(trainingData)
    +    model.predictOn(testData).print()
    --- End diff --
    
    ditto: use `predictOnValues`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60475562
  
      [Test build #22209 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22209/consoleFull) for   PR 2942 at commit [`2086bdc`](https://github.com/apache/spark/commit/2086bdc56a29f63a1c2143f88303e1296df45260).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60880665
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22428/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19454435
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    +      // store old count and centroid
    +      val oldCount = counts(newP._1)
    +      val oldCentroid = centers(newP._1).toBreeze
    +      // get new count and centroid
    +      val newCount = newP._2._2
    +      val newCentroid = newP._2._1 / newCount.toDouble
    +      // compute the normalized scale factor that controls forgetting
    +      val decayFactor = units match {
    +        case "batches" =>  newCount / (a * oldCount + newCount)
    +        case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount)
    +      }
    +      // perform the update
    +      val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor
    +      // store the new counts and centers
    +      counts(newP._1) = oldCount + newCount
    +      centers(newP._1) = Vectors.fromBreeze(updatedCentroid)
    +
    +      // display the updated cluster centers
    +      val display = centers(newP._1).size match {
    +        case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...")
    +        case _ => centers(newP._1).toArray.mkString("[", ",", "]")
    +      }
    +      logInfo("Cluster %d updated: %s ".format (newP._1, display))
    +    }
    +    new StreamingKMeansModel(centers, counts)
    +  }
    +
    +}
    +
    +@DeveloperApi
    +class StreamingKMeans(
    +     var k: Int,
    +     var a: Double,
    +     var units: String) extends Logging {
    +
    +  protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null)
    +
    +  def this() = this(2, 1.0, "batches")
    +
    +  /** Set the number of clusters. */
    +  def setK(k: Int): this.type = {
    +    this.k = k
    +    this
    +  }
    +
    +  /** Set the decay factor directly (for forgetful algorithms). */
    +  def setDecayFactor(a: Double): this.type = {
    +    this.a = a
    +    this
    +  }
    +
    +  /** Set the decay units for forgetful algorithms ("batches" or "points"). */
    +  def setUnits(units: String): this.type = {
    +    if (units != "batches" && units != "points") {
    +      throw new IllegalArgumentException("Invalid units for decay: " + units)
    +    }
    +    this.units = units
    +    this
    +  }
    +
    +  /** Set decay fraction in units of batches. */
    +  def setDecayFractionBatches(q: Double): this.type = {
    +    this.a = math.log(1 - q) / math.log(0.5)
    +    this.units = "batches"
    +    this
    +  }
    +
    +  /** Set decay fraction in units of points. Must specify expected number of points per batch. */
    +  def setDecayFractionPoints(q: Double, m: Double): this.type = {
    +    this.a = math.pow(math.log(1 - q) / math.log(0.5), 1/m)
    +    this.units = "points"
    +    this
    +  }
    +
    +  /** Specify initial explicitly directly. */
    +  def setInitialCenters(initialCenters: Array[Vector]): this.type = {
    +    val clusterCounts = Array.fill(this.k)(0).map(_.toLong)
    +    this.model = new StreamingKMeansModel(initialCenters, clusterCounts)
    +    this
    +  }
    +
    +  /** Initialize random centers, requiring only the number of dimensions. */
    +  def setRandomCenters(d: Int): this.type = {
    +    val initialCenters = (0 until k).map(_ => Vectors.dense(Array.fill(d)(nextGaussian()))).toArray
    +    val clusterCounts = Array.fill(this.k)(0).map(_.toLong)
    +    this.model = new StreamingKMeansModel(initialCenters, clusterCounts)
    +    this
    +  }
    +
    +  /** Return the latest model. */
    +  def latestModel(): StreamingKMeansModel = {
    +    model
    +  }
    +
    +  /**
    +   * Update the clustering model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * checks whether the cluster centers have been initialized,
    +   * and updates the model using each batch of data from the stream.
    +   *
    +   * @param data DStream containing vector data
    +   */
    +  def trainOn(data: DStream[Vector]) {
    +    this.isInitialized
    +    data.foreachRDD { (rdd, time) =>
    +      model = model.update(rdd, this.a, this.units)
    +    }
    +  }
    +
    +  /**
    +   * Use the clustering model to make predictions on batches of data from a DStream.
    +   *
    +   * @param data DStream containing vector data
    +   * @return DStream containing predictions
    +   */
    +  def predictOn(data: DStream[Vector]): DStream[Int] = {
    +    this.isInitialized
    +    data.map(model.predict)
    +  }
    +
    +  /**
    +   * Use the model to make predictions on the values of a DStream and carry over its keys.
    +   *
    +   * @param data DStream containing (key, feature vector) pairs
    +   * @tparam K key type
    +   * @return DStream containing the input keys and the predictions as values
    +   */
    +  def predictOnValues[K: ClassTag](data: DStream[(K, Vector)]): DStream[(K, Int)] = {
    +    this.isInitialized
    +    data.mapValues(model.predict)
    +  }
    +
    +  /**
    +   * Check whether cluster centers have been initialized.
    +   *
    +   * @return Boolean, True if cluster centrs have been initialized
    +   */
    +  def isInitialized: Boolean = {
    +    if (Option(model.clusterCenters) == None) {
    +      logError("Initial cluster centers must be set before starting predictions")
    +      throw new IllegalArgumentException
    --- End diff --
    
    might be better to remove the logError, and just put the error message as part of IllegalArgumentException.
    
    Actually this is not really IllegalArgumentException. It is more like IllegalStateException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60850276
  
    @mengxr @coderxiang @rxin Thanks all for the feedback! I'm implementing these changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19454416
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    +      // store old count and centroid
    +      val oldCount = counts(newP._1)
    +      val oldCentroid = centers(newP._1).toBreeze
    +      // get new count and centroid
    +      val newCount = newP._2._2
    +      val newCentroid = newP._2._1 / newCount.toDouble
    +      // compute the normalized scale factor that controls forgetting
    +      val decayFactor = units match {
    +        case "batches" =>  newCount / (a * oldCount + newCount)
    +        case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount)
    +      }
    +      // perform the update
    +      val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor
    +      // store the new counts and centers
    +      counts(newP._1) = oldCount + newCount
    +      centers(newP._1) = Vectors.fromBreeze(updatedCentroid)
    +
    +      // display the updated cluster centers
    +      val display = centers(newP._1).size match {
    +        case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...")
    +        case _ => centers(newP._1).toArray.mkString("[", ",", "]")
    +      }
    +      logInfo("Cluster %d updated: %s ".format (newP._1, display))
    +    }
    +    new StreamingKMeansModel(centers, counts)
    +  }
    +
    +}
    +
    +@DeveloperApi
    +class StreamingKMeans(
    +     var k: Int,
    +     var a: Double,
    +     var units: String) extends Logging {
    +
    +  protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null)
    +
    +  def this() = this(2, 1.0, "batches")
    +
    +  /** Set the number of clusters. */
    +  def setK(k: Int): this.type = {
    +    this.k = k
    +    this
    +  }
    +
    +  /** Set the decay factor directly (for forgetful algorithms). */
    +  def setDecayFactor(a: Double): this.type = {
    +    this.a = a
    +    this
    +  }
    +
    +  /** Set the decay units for forgetful algorithms ("batches" or "points"). */
    +  def setUnits(units: String): this.type = {
    +    if (units != "batches" && units != "points") {
    +      throw new IllegalArgumentException("Invalid units for decay: " + units)
    +    }
    +    this.units = units
    +    this
    +  }
    +
    +  /** Set decay fraction in units of batches. */
    +  def setDecayFractionBatches(q: Double): this.type = {
    +    this.a = math.log(1 - q) / math.log(0.5)
    +    this.units = "batches"
    +    this
    +  }
    +
    +  /** Set decay fraction in units of points. Must specify expected number of points per batch. */
    +  def setDecayFractionPoints(q: Double, m: Double): this.type = {
    +    this.a = math.pow(math.log(1 - q) / math.log(0.5), 1/m)
    +    this.units = "points"
    +    this
    +  }
    +
    +  /** Specify initial explicitly directly. */
    +  def setInitialCenters(initialCenters: Array[Vector]): this.type = {
    +    val clusterCounts = Array.fill(this.k)(0).map(_.toLong)
    --- End diff --
    
    Actually since the initial value is 0L, you can just do
    ```scala
    new Array[Long](this.k)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61358352
  
      [Test build #22677 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22677/consoleFull) for   PR 2942 at commit [`b2e5b4a`](https://github.com/apache/spark/commit/b2e5b4a167e0e5835f3518d2b68e4063c3f9c955).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class StreamingKMeansModel(`
      * `class StreamingKMeans(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490369
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    --- End diff --
    
    `KMeans` -> `k-means`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60795975
  
    It should be in a separate JIRA (and hence a separate PR). Please create a JIRA for `StreamingLinearRegression` and ping me there. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60875507
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22426/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60873448
  
    Had an offline discussion with @freeman-lab . We decided to introduce the concept of `timeUnit` to describe decay. A `timeUnit` (like a second) could be either a `batch` or a `point` (later we might add real "second"). Suppose that we have some points at time `t`. The decay factor `alpha` is the discount we take when we calculate the contribution from those points at time `t+1`. The half life `h` is defined such that at time `t + h` the discount becomes 0.5. So `alpha^h = 0.5` regardless of which `timeUnit` we use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60477105
  
      [Test build #22209 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22209/consoleFull) for   PR 2942 at commit [`2086bdc`](https://github.com/apache/spark/commit/2086bdc56a29f63a1c2143f88303e1296df45260).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class StreamingKMeansModel(`
      * `class StreamingKMeans(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61356545
  
      [Test build #22673 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22673/consoleFull) for   PR 2942 at commit [`078617c`](https://github.com/apache/spark/commit/078617c167b5b2b698193cddfd567d60637ba906).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class StreamingKMeansModel(`
      * `class StreamingKMeans(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61234935
  
      [Test build #22607 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22607/consoleFull) for   PR 2942 at commit [`0411bf5`](https://github.com/apache/spark/commit/0411bf563bf2296d3a56b1a60bb5e4e1f2789981).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19492205
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    +      // store old count and centroid
    +      val oldCount = counts(newP._1)
    +      val oldCentroid = centers(newP._1).toBreeze
    +      // get new count and centroid
    +      val newCount = newP._2._2
    +      val newCentroid = newP._2._1 / newCount.toDouble
    +      // compute the normalized scale factor that controls forgetting
    +      val decayFactor = units match {
    +        case "batches" =>  newCount / (a * oldCount + newCount)
    +        case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount)
    +      }
    +      // perform the update
    +      val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor
    +      // store the new counts and centers
    +      counts(newP._1) = oldCount + newCount
    +      centers(newP._1) = Vectors.fromBreeze(updatedCentroid)
    +
    +      // display the updated cluster centers
    +      val display = centers(newP._1).size match {
    +        case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...")
    +        case _ => centers(newP._1).toArray.mkString("[", ",", "]")
    +      }
    +      logInfo("Cluster %d updated: %s ".format (newP._1, display))
    +    }
    +    new StreamingKMeansModel(centers, counts)
    +  }
    +
    +}
    +
    +@DeveloperApi
    +class StreamingKMeans(
    +     var k: Int,
    +     var a: Double,
    +     var units: String) extends Logging {
    +
    +  protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null)
    +
    +  def this() = this(2, 1.0, "batches")
    +
    +  /** Set the number of clusters. */
    +  def setK(k: Int): this.type = {
    +    this.k = k
    +    this
    +  }
    +
    +  /** Set the decay factor directly (for forgetful algorithms). */
    +  def setDecayFactor(a: Double): this.type = {
    +    this.a = a
    +    this
    +  }
    +
    +  /** Set the decay units for forgetful algorithms ("batches" or "points"). */
    +  def setUnits(units: String): this.type = {
    +    if (units != "batches" && units != "points") {
    +      throw new IllegalArgumentException("Invalid units for decay: " + units)
    +    }
    +    this.units = units
    +    this
    +  }
    +
    +  /** Set decay fraction in units of batches. */
    +  def setDecayFractionBatches(q: Double): this.type = {
    +    this.a = math.log(1 - q) / math.log(0.5)
    +    this.units = "batches"
    +    this
    +  }
    +
    +  /** Set decay fraction in units of points. Must specify expected number of points per batch. */
    +  def setDecayFractionPoints(q: Double, m: Double): this.type = {
    +    this.a = math.pow(math.log(1 - q) / math.log(0.5), 1/m)
    +    this.units = "points"
    +    this
    +  }
    +
    +  /** Specify initial explicitly directly. */
    +  def setInitialCenters(initialCenters: Array[Vector]): this.type = {
    +    val clusterCounts = Array.fill(this.k)(0).map(_.toLong)
    +    this.model = new StreamingKMeansModel(initialCenters, clusterCounts)
    +    this
    +  }
    +
    +  /** Initialize random centers, requiring only the number of dimensions. */
    +  def setRandomCenters(d: Int): this.type = {
    +    val initialCenters = (0 until k).map(_ => Vectors.dense(Array.fill(d)(nextGaussian()))).toArray
    +    val clusterCounts = Array.fill(this.k)(0).map(_.toLong)
    +    this.model = new StreamingKMeansModel(initialCenters, clusterCounts)
    +    this
    +  }
    +
    +  /** Return the latest model. */
    +  def latestModel(): StreamingKMeansModel = {
    +    model
    +  }
    +
    +  /**
    +   * Update the clustering model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * checks whether the cluster centers have been initialized,
    +   * and updates the model using each batch of data from the stream.
    +   *
    +   * @param data DStream containing vector data
    +   */
    +  def trainOn(data: DStream[Vector]) {
    +    this.isInitialized
    +    data.foreachRDD { (rdd, time) =>
    +      model = model.update(rdd, this.a, this.units)
    +    }
    +  }
    +
    +  /**
    +   * Use the clustering model to make predictions on batches of data from a DStream.
    +   *
    +   * @param data DStream containing vector data
    +   * @return DStream containing predictions
    +   */
    +  def predictOn(data: DStream[Vector]): DStream[Int] = {
    +    this.isInitialized
    +    data.map(model.predict)
    +  }
    +
    +  /**
    +   * Use the model to make predictions on the values of a DStream and carry over its keys.
    +   *
    +   * @param data DStream containing (key, feature vector) pairs
    +   * @tparam K key type
    +   * @return DStream containing the input keys and the predictions as values
    +   */
    +  def predictOnValues[K: ClassTag](data: DStream[(K, Vector)]): DStream[(K, Int)] = {
    +    this.isInitialized
    +    data.mapValues(model.predict)
    +  }
    +
    +  /**
    +   * Check whether cluster centers have been initialized.
    +   *
    +   * @return Boolean, True if cluster centrs have been initialized
    +   */
    +  def isInitialized: Boolean = {
    --- End diff --
    
    Usually, methods with name like `isAbc`do not throw an exception. Shall we rename it to `assertInitialized` and return `Unit`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490483
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    +      // store old count and centroid
    +      val oldCount = counts(newP._1)
    +      val oldCentroid = centers(newP._1).toBreeze
    +      // get new count and centroid
    +      val newCount = newP._2._2
    +      val newCentroid = newP._2._1 / newCount.toDouble
    +      // compute the normalized scale factor that controls forgetting
    +      val decayFactor = units match {
    +        case "batches" =>  newCount / (a * oldCount + newCount)
    +        case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount)
    +      }
    +      // perform the update
    +      val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor
    +      // store the new counts and centers
    +      counts(newP._1) = oldCount + newCount
    +      centers(newP._1) = Vectors.fromBreeze(updatedCentroid)
    +
    +      // display the updated cluster centers
    +      val display = centers(newP._1).size match {
    +        case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...")
    +        case _ => centers(newP._1).toArray.mkString("[", ",", "]")
    +      }
    +      logInfo("Cluster %d updated: %s ".format (newP._1, display))
    +    }
    +    new StreamingKMeansModel(centers, counts)
    +  }
    +
    +}
    +
    +@DeveloperApi
    +class StreamingKMeans(
    --- End diff --
    
    need doc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19492147
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala ---
    @@ -0,0 +1,246 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import scala.reflect.ClassTag
    +import scala.util.Random._
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext._
    +
    +/**
    + * :: DeveloperApi ::
    + *
    + * StreamingKMeansModel extends MLlib's KMeansModel for streaming
    + * algorithms, so it can keep track of the number of points assigned
    + * to each cluster, and also update the model by doing a single iteration
    + * of the standard KMeans algorithm.
    + *
    + * The update algorithm uses the "mini-batch" KMeans rule,
    + * generalized to incorporate forgetfullness (i.e. decay).
    + * The basic update rule (for each cluster) is:
    + *
    + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t]
    + * n_t+t = n_t + m_t
    + *
    + * Where c_t is the previously estimated centroid for that cluster,
    + * n_t is the number of points assigned to it thus far, x_t is the centroid
    + * estimated on the current batch, and m_t is the number of points assigned
    + * to that centroid in the current batch.
    + *
    + * This update rule is modified with a decay factor 'a' that scales
    + * the contribution of the clusters as estimated thus far.
    + * If a=1, all batches are weighted equally. If a=0, new centroids
    + * are determined entirely by recent data. Lower values correspond to
    + * more forgetting.
    + *
    + * Decay can optionally be specified as a decay fraction 'q',
    + * which corresponds to the fraction of batches (or points)
    + * after which the past will be reduced to a contribution of 0.5.
    + * This decay fraction can be specified in units of 'points' or 'batches'.
    + * if 'batches', behavior will be independent of the number of points per batch;
    + * if 'points', the expected number of points per batch must be specified.
    + *
    + * Use a builder pattern to construct a streaming KMeans analysis
    + * in an application, like:
    + *
    + *  val model = new StreamingKMeans()
    + *    .setDecayFactor(0.5)
    + *    .setK(3)
    + *    .setRandomCenters(5)
    + *    .trainOn(DStream)
    + *
    + */
    +@DeveloperApi
    +class StreamingKMeansModel(
    +    override val clusterCenters: Array[Vector],
    +    val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging {
    +
    +  // do a sequential KMeans update on a batch of data
    +  def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = {
    +
    +    val centers = clusterCenters
    +    val counts = clusterCounts
    +
    +    // find nearest cluster to each point
    +    val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong)))
    +
    +    // get sums and counts for updating each cluster
    +    type WeightedPoint = (BV[Double], Long)
    +    def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
    +      (p1._1 += p2._1, p1._2 + p2._2)
    +    }
    +    val pointStats: Array[(Int, (BV[Double], Long))] =
    +      closest.reduceByKey{mergeContribs}.collectAsMap().toArray
    +
    +    // implement update rule
    +    for (newP <- pointStats) {
    +      // store old count and centroid
    +      val oldCount = counts(newP._1)
    +      val oldCentroid = centers(newP._1).toBreeze
    +      // get new count and centroid
    +      val newCount = newP._2._2
    +      val newCentroid = newP._2._1 / newCount.toDouble
    +      // compute the normalized scale factor that controls forgetting
    +      val decayFactor = units match {
    +        case "batches" =>  newCount / (a * oldCount + newCount)
    +        case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount)
    +      }
    +      // perform the update
    +      val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor
    +      // store the new counts and centers
    +      counts(newP._1) = oldCount + newCount
    +      centers(newP._1) = Vectors.fromBreeze(updatedCentroid)
    +
    +      // display the updated cluster centers
    +      val display = centers(newP._1).size match {
    +        case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...")
    +        case _ => centers(newP._1).toArray.mkString("[", ",", "]")
    +      }
    +      logInfo("Cluster %d updated: %s ".format (newP._1, display))
    +    }
    +    new StreamingKMeansModel(centers, counts)
    +  }
    +
    +}
    +
    +@DeveloperApi
    +class StreamingKMeans(
    +     var k: Int,
    +     var a: Double,
    +     var units: String) extends Logging {
    +
    +  protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null)
    +
    +  def this() = this(2, 1.0, "batches")
    +
    +  /** Set the number of clusters. */
    +  def setK(k: Int): this.type = {
    +    this.k = k
    +    this
    +  }
    +
    +  /** Set the decay factor directly (for forgetful algorithms). */
    +  def setDecayFactor(a: Double): this.type = {
    +    this.a = a
    +    this
    +  }
    +
    +  /** Set the decay units for forgetful algorithms ("batches" or "points"). */
    +  def setUnits(units: String): this.type = {
    +    if (units != "batches" && units != "points") {
    +      throw new IllegalArgumentException("Invalid units for decay: " + units)
    +    }
    +    this.units = units
    +    this
    +  }
    +
    +  /** Set decay fraction in units of batches. */
    +  def setDecayFractionBatches(q: Double): this.type = {
    +    this.a = math.log(1 - q) / math.log(0.5)
    +    this.units = "batches"
    +    this
    +  }
    +
    +  /** Set decay fraction in units of points. Must specify expected number of points per batch. */
    +  def setDecayFractionPoints(q: Double, m: Double): this.type = {
    +    this.a = math.pow(math.log(1 - q) / math.log(0.5), 1/m)
    +    this.units = "points"
    +    this
    +  }
    +
    +  /** Specify initial explicitly directly. */
    +  def setInitialCenters(initialCenters: Array[Vector]): this.type = {
    +    val clusterCounts = Array.fill(this.k)(0).map(_.toLong)
    +    this.model = new StreamingKMeansModel(initialCenters, clusterCounts)
    +    this
    +  }
    +
    +  /** Initialize random centers, requiring only the number of dimensions. */
    +  def setRandomCenters(d: Int): this.type = {
    +    val initialCenters = (0 until k).map(_ => Vectors.dense(Array.fill(d)(nextGaussian()))).toArray
    +    val clusterCounts = Array.fill(this.k)(0).map(_.toLong)
    --- End diff --
    
    `new Array[Long](this.k)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60880661
  
      [Test build #22428 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22428/consoleFull) for   PR 2942 at commit [`9f7aea9`](https://github.com/apache/spark/commit/9f7aea9eac3c64f646d1783909e0e2d155663399).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class StreamingKMeansModel(`
      * `class StreamingKMeans(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2942#discussion_r19490254
  
    --- Diff: docs/mllib-clustering.md ---
    @@ -153,3 +153,75 @@ provided in the [Self-Contained Applications](quick-start.html#self-contained-ap
     section of the Spark
     Quick Start guide. Be sure to also include *spark-mllib* to your build file as
     a dependency.
    +
    +## Streaming clustering
    +
    +When data arrive in a stream, we may want to estimate clusters dynamically, updating them as new data arrive. MLlib provides support for streaming KMeans clustering, with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm uses a generalization of the mini-batch KMeans update rule. For each batch of data, we assign all points to their nearest cluster, compute new cluster centers, then update each cluster using:
    +
    +`\begin{equation}
    +    c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t}
    +\end{equation}`
    +`\begin{equation}
    +    n_{t+1} = n_t + m_t  
    +\end{equation}`
    +
    +Where `$c_t$` is the previous center for the cluster, `$n_t$` is the number of points assigned to the cluster thus far, `$x_t$` is the new cluster center from the current batch, and `$m_t$` is the number of points added to the cluster in the current batch. The decay factor `$\alpha$` can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning; with `$\alpha$=0` only the most recent data will be used. This is analogous to an expontentially-weighted moving average.
    +
    +### Examples
    +
    +This example shows how to estimate clusters on streaming data.
    +
    +<div class="codetabs">
    +
    +<div data-lang="scala" markdown="1">
    +
    +First we import the neccessary classes.
    +
    +{% highlight scala %}
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.mllib.clustering.StreamingKMeans
    +
    +{% endhighlight %}
    +
    +Then we make an input stream of vectors for training, as well as one for testing. We assume a StreamingContext `ssc` has been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. For this example, we use vector data. 
    --- End diff --
    
    line too wide


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by anantasty <gi...@git.apache.org>.
Github user anantasty commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-60795596
  
    I would certainly be interested in doing that. I just wasn't sure if it was
    better to do it as a separate PR/ task.
    On Oct 28, 2014 11:19 AM, "Xiangrui Meng" <no...@github.com> wrote:
    
    > @anantasty <https://github.com/anantasty> This PR is still in review. If
    > you are interested in Python binding of streaming algorithms. Could you
    > help add one for StreamingLinearRegression? Thanks!
    >
    >
    > https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/2942#issuecomment-60794676>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Streaming KMeans [MLLIB][SPARK-3254]

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2942#issuecomment-61241988
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22607/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org