You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ramach1776 <ra...@s1776.com> on 2016/02/19 02:59:42 UTC

StreamingKMeans does not update cluster centroid locations

I have streaming application wherein I train the model using a receiver input
stream in 4 sec batches

val stream = ssc.receiverStream(receiver) //receiver gets new data every
batch
model.trainOn(stream.map(Vectors.parse))
If I use
model.latestModel.clusterCenters.foreach(println)

the value of cluster centers remain unchanged from the very initial value it
got during first iteration (when the streaming app started)

when I use the model to predict cluster assignment with a labeled input the
assignments change over time as expected

      testData.transform {rdd =>
        rdd.map(lp => (lp.label, model.latestModel().predict(lp.features)))
      }.print()










--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: StreamingKMeans does not update cluster centroid locations

Posted by Bryan Cutler <cu...@gmail.com>.
This simple example works for me, it prints out the updated model centers.
I'm running from the master branch.

          val sc = new SparkContext("local[2]", "test")
    val ssc = new StreamingContext(sc, Seconds(1))

    val kMeans = new StreamingKMeans()
        .setK(2)
        .setDecayFactor(0.0)
        .setInitialCenters(Array(Vectors.dense(0.0), Vectors.dense(1.0)),
Array(1.0, 1.0))

    val rddQueue = new SynchronizedQueue[RDD[Vector]]()

    val data1 = sc.parallelize(Array(
      Vectors.dense(-0.5),
      Vectors.dense(0.6),
      Vectors.dense(0.8)
    ))

    val data2 = sc.parallelize(Array(
      Vectors.dense(0.2),
      Vectors.dense(-0.1),
      Vectors.dense(0.3)
    ))

    rddQueue += data1
    rddQueue += data2

    val inputStream = ssc.queueStream(rddQueue)

    kMeans.trainOn(inputStream)

    val predictStream = kMeans.predictOn(inputStream)

    def collect(rdd: RDD[Int]): Unit = {
      val rdd_collect = rdd.collect()
      println(s"predict_results: ${rdd_collect.mkString(",")}")
      kMeans.latestModel.clusterCenters.foreach(println)
    }

    predictStream.foreachRDD(collect _)

    ssc.start()
    ssc.awaitTermination()

On Fri, Feb 19, 2016 at 1:15 PM, krishna ramachandran <ra...@s1776.com>
wrote:

> Also the cluster centroid I get in streaming mode (some with negative
> values) do not make sense - if I use the same data and run in batch
>
> KMeans.train(sc.parallelize(parsedData), numClusters, numIterations)
>
> cluster centers are what you would expect.
>
> Krishna
>
>
>
> On Fri, Feb 19, 2016 at 12:49 PM, krishna ramachandran <ra...@s1776.com>
> wrote:
>
>> ok i will share a simple example soon. meantime you will be able to see
>> this behavior using example here,
>>
>>
>> https://github.com/apache/spark/blob/branch-1.2/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala
>>
>> slightly modify it to include
>>
>> model.latestModel.clusterCenters.foreach(println)
>>
>> (after model.trainOn)
>>
>> add new files to trainingDir periodically
>>
>> I have 3 dimensions per data-point - they look like these,
>>
>> [1, 1, 385.2241452777778]
>>
>> [3, 1, 384.7529463888889]
>>
>> [4,1, 3083.2778025]
>>
>> [2, 4, 6226.402321388889]
>>
>> [1, 2, 785.8426655555555]
>>
>> [5, 1, 6706.054241388889]
>>
>> ........
>>
>> and monitor. please let know if I missed something
>>
>> Krishna
>>
>>
>>
>>
>>
>> On Fri, Feb 19, 2016 at 10:59 AM, Bryan Cutler <cu...@gmail.com> wrote:
>>
>>> Can you share more of your code to reproduce this issue?  The model
>>> should be updated with each batch, but can't tell what is happening from
>>> what you posted so far.
>>>
>>> On Fri, Feb 19, 2016 at 10:40 AM, krishna ramachandran <ramach@s1776.com
>>> > wrote:
>>>
>>>> Hi Bryan
>>>> Agreed. It is a single statement to print the centers once for *every*
>>>> streaming batch (4 secs) - remember this is in streaming mode and the
>>>> receiver has fresh data every batch. That is, as the model is trained
>>>> continuously so I expect the centroids to change with incoming streams (at
>>>> least until convergence)
>>>>
>>>> But am seeing same centers always for the entire duration - ran the app
>>>> for several hours with a custom receiver.
>>>>
>>>> Yes I am using the latestModel to predict using "labeled" test data.
>>>> But also like to know where my centers are
>>>>
>>>> regards
>>>> Krishna
>>>>
>>>>
>>>>
>>>> On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler <cu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Could you elaborate where the issue is?  You say calling
>>>>> model.latestModel.clusterCenters.foreach(println) doesn't show an updated
>>>>> model, but that is just a single statement to print the centers once..
>>>>>
>>>>> Also, is there any reason you don't predict on the test data like this?
>>>>>
>>>>> model.predictOnValues(testData.map(lp => (lp.label,
>>>>> lp.features))).print()
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776 <ra...@s1776.com> wrote:
>>>>>
>>>>>> I have streaming application wherein I train the model using a
>>>>>> receiver input
>>>>>> stream in 4 sec batches
>>>>>>
>>>>>> val stream = ssc.receiverStream(receiver) //receiver gets new data
>>>>>> every
>>>>>> batch
>>>>>> model.trainOn(stream.map(Vectors.parse))
>>>>>> If I use
>>>>>> model.latestModel.clusterCenters.foreach(println)
>>>>>>
>>>>>> the value of cluster centers remain unchanged from the very initial
>>>>>> value it
>>>>>> got during first iteration (when the streaming app started)
>>>>>>
>>>>>> when I use the model to predict cluster assignment with a labeled
>>>>>> input the
>>>>>> assignments change over time as expected
>>>>>>
>>>>>>       testData.transform {rdd =>
>>>>>>         rdd.map(lp => (lp.label,
>>>>>> model.latestModel().predict(lp.features)))
>>>>>>       }.print()
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: StreamingKMeans does not update cluster centroid locations

Posted by krishna ramachandran <ra...@s1776.com>.
Also the cluster centroid I get in streaming mode (some with negative
values) do not make sense - if I use the same data and run in batch

KMeans.train(sc.parallelize(parsedData), numClusters, numIterations)

cluster centers are what you would expect.

Krishna



On Fri, Feb 19, 2016 at 12:49 PM, krishna ramachandran <ra...@s1776.com>
wrote:

> ok i will share a simple example soon. meantime you will be able to see
> this behavior using example here,
>
>
> https://github.com/apache/spark/blob/branch-1.2/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala
>
> slightly modify it to include
>
> model.latestModel.clusterCenters.foreach(println)
>
> (after model.trainOn)
>
> add new files to trainingDir periodically
>
> I have 3 dimensions per data-point - they look like these,
>
> [1, 1, 385.2241452777778]
>
> [3, 1, 384.7529463888889]
>
> [4,1, 3083.2778025]
>
> [2, 4, 6226.402321388889]
>
> [1, 2, 785.8426655555555]
>
> [5, 1, 6706.054241388889]
>
> ........
>
> and monitor. please let know if I missed something
>
> Krishna
>
>
>
>
>
> On Fri, Feb 19, 2016 at 10:59 AM, Bryan Cutler <cu...@gmail.com> wrote:
>
>> Can you share more of your code to reproduce this issue?  The model
>> should be updated with each batch, but can't tell what is happening from
>> what you posted so far.
>>
>> On Fri, Feb 19, 2016 at 10:40 AM, krishna ramachandran <ra...@s1776.com>
>> wrote:
>>
>>> Hi Bryan
>>> Agreed. It is a single statement to print the centers once for *every*
>>> streaming batch (4 secs) - remember this is in streaming mode and the
>>> receiver has fresh data every batch. That is, as the model is trained
>>> continuously so I expect the centroids to change with incoming streams (at
>>> least until convergence)
>>>
>>> But am seeing same centers always for the entire duration - ran the app
>>> for several hours with a custom receiver.
>>>
>>> Yes I am using the latestModel to predict using "labeled" test data. But
>>> also like to know where my centers are
>>>
>>> regards
>>> Krishna
>>>
>>>
>>>
>>> On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler <cu...@gmail.com>
>>> wrote:
>>>
>>>> Could you elaborate where the issue is?  You say calling
>>>> model.latestModel.clusterCenters.foreach(println) doesn't show an updated
>>>> model, but that is just a single statement to print the centers once..
>>>>
>>>> Also, is there any reason you don't predict on the test data like this?
>>>>
>>>> model.predictOnValues(testData.map(lp => (lp.label,
>>>> lp.features))).print()
>>>>
>>>>
>>>>
>>>> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776 <ra...@s1776.com> wrote:
>>>>
>>>>> I have streaming application wherein I train the model using a
>>>>> receiver input
>>>>> stream in 4 sec batches
>>>>>
>>>>> val stream = ssc.receiverStream(receiver) //receiver gets new data
>>>>> every
>>>>> batch
>>>>> model.trainOn(stream.map(Vectors.parse))
>>>>> If I use
>>>>> model.latestModel.clusterCenters.foreach(println)
>>>>>
>>>>> the value of cluster centers remain unchanged from the very initial
>>>>> value it
>>>>> got during first iteration (when the streaming app started)
>>>>>
>>>>> when I use the model to predict cluster assignment with a labeled
>>>>> input the
>>>>> assignments change over time as expected
>>>>>
>>>>>       testData.transform {rdd =>
>>>>>         rdd.map(lp => (lp.label,
>>>>> model.latestModel().predict(lp.features)))
>>>>>       }.print()
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: StreamingKMeans does not update cluster centroid locations

Posted by krishna ramachandran <ra...@s1776.com>.
ok i will share a simple example soon. meantime you will be able to see
this behavior using example here,

https://github.com/apache/spark/blob/branch-1.2/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala

slightly modify it to include

model.latestModel.clusterCenters.foreach(println)

(after model.trainOn)

add new files to trainingDir periodically

I have 3 dimensions per data-point - they look like these,

[1, 1, 385.2241452777778]

[3, 1, 384.7529463888889]

[4,1, 3083.2778025]

[2, 4, 6226.402321388889]

[1, 2, 785.8426655555555]

[5, 1, 6706.054241388889]

........

and monitor. please let know if I missed something

Krishna





On Fri, Feb 19, 2016 at 10:59 AM, Bryan Cutler <cu...@gmail.com> wrote:

> Can you share more of your code to reproduce this issue?  The model should
> be updated with each batch, but can't tell what is happening from what you
> posted so far.
>
> On Fri, Feb 19, 2016 at 10:40 AM, krishna ramachandran <ra...@s1776.com>
> wrote:
>
>> Hi Bryan
>> Agreed. It is a single statement to print the centers once for *every*
>> streaming batch (4 secs) - remember this is in streaming mode and the
>> receiver has fresh data every batch. That is, as the model is trained
>> continuously so I expect the centroids to change with incoming streams (at
>> least until convergence)
>>
>> But am seeing same centers always for the entire duration - ran the app
>> for several hours with a custom receiver.
>>
>> Yes I am using the latestModel to predict using "labeled" test data. But
>> also like to know where my centers are
>>
>> regards
>> Krishna
>>
>>
>>
>> On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler <cu...@gmail.com> wrote:
>>
>>> Could you elaborate where the issue is?  You say calling
>>> model.latestModel.clusterCenters.foreach(println) doesn't show an updated
>>> model, but that is just a single statement to print the centers once..
>>>
>>> Also, is there any reason you don't predict on the test data like this?
>>>
>>> model.predictOnValues(testData.map(lp => (lp.label,
>>> lp.features))).print()
>>>
>>>
>>>
>>> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776 <ra...@s1776.com> wrote:
>>>
>>>> I have streaming application wherein I train the model using a receiver
>>>> input
>>>> stream in 4 sec batches
>>>>
>>>> val stream = ssc.receiverStream(receiver) //receiver gets new data every
>>>> batch
>>>> model.trainOn(stream.map(Vectors.parse))
>>>> If I use
>>>> model.latestModel.clusterCenters.foreach(println)
>>>>
>>>> the value of cluster centers remain unchanged from the very initial
>>>> value it
>>>> got during first iteration (when the streaming app started)
>>>>
>>>> when I use the model to predict cluster assignment with a labeled input
>>>> the
>>>> assignments change over time as expected
>>>>
>>>>       testData.transform {rdd =>
>>>>         rdd.map(lp => (lp.label,
>>>> model.latestModel().predict(lp.features)))
>>>>       }.print()
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>
>

Re: StreamingKMeans does not update cluster centroid locations

Posted by Bryan Cutler <cu...@gmail.com>.
Can you share more of your code to reproduce this issue?  The model should
be updated with each batch, but can't tell what is happening from what you
posted so far.

On Fri, Feb 19, 2016 at 10:40 AM, krishna ramachandran <ra...@s1776.com>
wrote:

> Hi Bryan
> Agreed. It is a single statement to print the centers once for *every*
> streaming batch (4 secs) - remember this is in streaming mode and the
> receiver has fresh data every batch. That is, as the model is trained
> continuously so I expect the centroids to change with incoming streams (at
> least until convergence)
>
> But am seeing same centers always for the entire duration - ran the app
> for several hours with a custom receiver.
>
> Yes I am using the latestModel to predict using "labeled" test data. But
> also like to know where my centers are
>
> regards
> Krishna
>
>
>
> On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler <cu...@gmail.com> wrote:
>
>> Could you elaborate where the issue is?  You say calling
>> model.latestModel.clusterCenters.foreach(println) doesn't show an updated
>> model, but that is just a single statement to print the centers once..
>>
>> Also, is there any reason you don't predict on the test data like this?
>>
>> model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
>>
>>
>>
>> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776 <ra...@s1776.com> wrote:
>>
>>> I have streaming application wherein I train the model using a receiver
>>> input
>>> stream in 4 sec batches
>>>
>>> val stream = ssc.receiverStream(receiver) //receiver gets new data every
>>> batch
>>> model.trainOn(stream.map(Vectors.parse))
>>> If I use
>>> model.latestModel.clusterCenters.foreach(println)
>>>
>>> the value of cluster centers remain unchanged from the very initial
>>> value it
>>> got during first iteration (when the streaming app started)
>>>
>>> when I use the model to predict cluster assignment with a labeled input
>>> the
>>> assignments change over time as expected
>>>
>>>       testData.transform {rdd =>
>>>         rdd.map(lp => (lp.label,
>>> model.latestModel().predict(lp.features)))
>>>       }.print()
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>

Re: StreamingKMeans does not update cluster centroid locations

Posted by krishna ramachandran <ra...@s1776.com>.
Hi Bryan
Agreed. It is a single statement to print the centers once for *every*
streaming batch (4 secs) - remember this is in streaming mode and the
receiver has fresh data every batch. That is, as the model is trained
continuously so I expect the centroids to change with incoming streams (at
least until convergence)

But am seeing same centers always for the entire duration - ran the app for
several hours with a custom receiver.

Yes I am using the latestModel to predict using "labeled" test data. But
also like to know where my centers are

regards
Krishna



On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler <cu...@gmail.com> wrote:

> Could you elaborate where the issue is?  You say calling
> model.latestModel.clusterCenters.foreach(println) doesn't show an updated
> model, but that is just a single statement to print the centers once..
>
> Also, is there any reason you don't predict on the test data like this?
>
> model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
>
>
>
> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776 <ra...@s1776.com> wrote:
>
>> I have streaming application wherein I train the model using a receiver
>> input
>> stream in 4 sec batches
>>
>> val stream = ssc.receiverStream(receiver) //receiver gets new data every
>> batch
>> model.trainOn(stream.map(Vectors.parse))
>> If I use
>> model.latestModel.clusterCenters.foreach(println)
>>
>> the value of cluster centers remain unchanged from the very initial value
>> it
>> got during first iteration (when the streaming app started)
>>
>> when I use the model to predict cluster assignment with a labeled input
>> the
>> assignments change over time as expected
>>
>>       testData.transform {rdd =>
>>         rdd.map(lp => (lp.label,
>> model.latestModel().predict(lp.features)))
>>       }.print()
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: StreamingKMeans does not update cluster centroid locations

Posted by Bryan Cutler <cu...@gmail.com>.
Could you elaborate where the issue is?  You say calling
model.latestModel.clusterCenters.foreach(println) doesn't show an updated
model, but that is just a single statement to print the centers once..

Also, is there any reason you don't predict on the test data like this?

model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()



On Thu, Feb 18, 2016 at 5:59 PM, ramach1776 <ra...@s1776.com> wrote:

> I have streaming application wherein I train the model using a receiver
> input
> stream in 4 sec batches
>
> val stream = ssc.receiverStream(receiver) //receiver gets new data every
> batch
> model.trainOn(stream.map(Vectors.parse))
> If I use
> model.latestModel.clusterCenters.foreach(println)
>
> the value of cluster centers remain unchanged from the very initial value
> it
> got during first iteration (when the streaming app started)
>
> when I use the model to predict cluster assignment with a labeled input the
> assignments change over time as expected
>
>       testData.transform {rdd =>
>         rdd.map(lp => (lp.label, model.latestModel().predict(lp.features)))
>       }.print()
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>