You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Chandra Mohan, Ananda Vel Murugan" <An...@honeywell.com> on 2016/06/10 11:04:35 UTC

Kmeans Streaming process flow

Hi,

I am in the process of implementing a spark streaming application to do clustering of some events. I have a DStream of vectors that I have created from each event. Now I am trying to apply clustering. I referred following example in spark github.

There is a train method and predictOnValues method. I am confused how to map this example for my use case. In my case, I would be getting the stream of events 24 * 7. I am not sure how to split the "all day" data separately for train and predict methods. Or should this streaming application be run in train mode for few days and predict mode later? I am not able to find a suitable example on the web. Please advise. Thanks.

https://github.com/apache/spark/blob/branch-1.2/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala

object StreamingKMeansExample {

  def main(args: Array[String]) {
    if (args.length != 5) {
      System.err.println(
        "Usage: StreamingKMeansExample " +
          "<trainingDir> <testDir> <batchDuration> <numClusters> <numDimensions>")
      System.exit(1)
    }

    val conf = new SparkConf().setMaster("local").setAppName("StreamingKMeansExample")
    val ssc = new StreamingContext(conf, Seconds(args(2).toLong))

    val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
    val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)

    val model = new StreamingKMeans()
      .setK(args(3).toInt)
      .setDecayFactor(1.0)
      .setRandomCenters(args(4).toInt, 0.0)

    model.trainOn(trainingData)
    model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

Regards,
Anand.C