You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anupam Bagchi <an...@rocketmail.com> on 2015/07/12 08:44:28 UTC

Moving average using Spark and Scala

I have to do the following tasks on a dataset using Apache Spark with Scala as the programming language:

Read the dataset from HDFS. A few sample lines look like this:
deviceid,bytes,eventdate
15590657,246620,20150630
14066921,1907,20150621
14066921,1906,20150626
6522013,2349,20150626
6522013,2525,20150613
Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate)

For each device, sort the set by eventdate. We now have an ordered set of bytes based on eventdate for each device.

Pick the last 30 days of bytes from this ordered set.

Find the moving average of bytes for the last date using a time period of 30.

Find the standard deviation of the bytes for the final date using a time period of 30.

Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3]

I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion rows finally.

Here is the data structure for the dataset.

package com.testing
case class DeviceAggregates (
                        device_id: Integer,
                        bytes: Long,
                        eventdate: Integer
                   ) extends Ordered[DailyDeviceAggregates] {
  def compare(that: DailyDeviceAggregates): Int = {
    eventdate - that.eventdate
  }
}
object DeviceAggregates {
  def parseLogLine(logline: String): DailyDeviceAggregates = {
    val c = logline.split(",")
    DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
  }
}
The DeviceAnalyzer class looks like this:

I have a very crude implementation that does the job, but it is not up to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. Here is what I have now:

import com.testing.DailyDeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Analyzer")
    val sc = new SparkContext(sparkConf)

    val logFile = args(0)

    val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

    // Calculate statistics based on bytes
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)

    deviceIdsMap.foreach(a => {
      val device_id = a._1  // This is the device ID
      val allaggregates = a._2  // This is an array of all device-aggregates for this device

      println(allaggregates)
      Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
      println(allaggregates) // This does not work - results are not sorted !!

      val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
      val count = byteValues.count(A => true)
      val sum = byteValues.sum
      val xbar = sum / count
      val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
      val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

      val vector: Vector = Vectors.dense(byteValues)
      println(vector)
      println(device_id + "," + xbar + "," + stddev)

      //val vector: Vector = Vectors.dense(byteValues)
      //println(vector)
      //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
    })

    sc.stop()
  }
}
I would really appreciate if someone can suggests improvements for the following:

The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.

Thanks in advance for your help.

Anupam Bagchi