You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anupam Bagchi <an...@rocketmail.com> on 2015/07/13 19:07:05 UTC

Finding moving average using Spark and Scala

I have to do the following tasks on a dataset using Apache Spark with Scala as the programming language:   
   - Read the dataset from HDFS. A few sample lines look like this:

deviceid,bytes,eventdate
15590657,246620,20150630
14066921,1907,20150621
14066921,1906,20150626
6522013,2349,20150626
6522013,2525,20150613
   
   - Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate)
   - For each device, sort the set by eventdate. We now have an ordered set of bytes based on eventdate for each device.
   - Pick the last 30 days of bytes from this ordered set.
   - Find the moving average of bytes for the last date using a time period of 30.
   - Find the standard deviation of the bytes for the final date using a time period of 30.
   - Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3]
I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion rows finally.Here is the data structure for the dataset.package com.testing
case class DeviceAggregates (
                        device_id: Integer,
                        bytes: Long,
                        eventdate: Integer
                   ) extends Ordered[DailyDeviceAggregates] {
  def compare(that: DailyDeviceAggregates): Int = {
    eventdate - that.eventdate
  }
}
object DeviceAggregates {
  def parseLogLine(logline: String): DailyDeviceAggregates = {
    val c = logline.split(",")
    DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
  }
}The DeviceAnalyzer class looks like this:I have a very crude implementation that does the job, but it is not up to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. Here is what I have now:
import com.testing.DailyDeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Analyzer")
    val sc = new SparkContext(sparkConf)

    val logFile = args(0)

    val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

    // Calculate statistics based on bytes
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)

    deviceIdsMap.foreach(a => {
      val device_id = a._1  // This is the device ID
      val allaggregates = a._2  // This is an array of all device-aggregates for this device

      println(allaggregates)
      Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
      println(allaggregates) // This does not work - results are not sorted !!

      val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
      val count = byteValues.count(A => true)
      val sum = byteValues.sum
      val xbar = sum / count
      val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
      val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

      val vector: Vector = Vectors.dense(byteValues)
      println(vector)
      println(device_id + "," + xbar + "," + stddev)

      //val vector: Vector = Vectors.dense(byteValues)
      //println(vector)
      //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
    })

    sc.stop()
  }
}I would really appreciate if someone can suggests improvements for the following:   
   - The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
   - I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
   - For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
   - At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.

Thanks in advance for your help.
Anupam Bagchi 

Re: Finding moving average using Spark and Scala

Posted by Anupam Bagchi <an...@rocketmail.com>.
Thanks Feynman for your direction.

I was able to solve this problem by calling Spark API from Java.

Here is a code snippet that may help other people who might face the same challenge.

        if (args.length > 2) {
            earliestEventDate = Integer.parseInt(args[2]);
        } else {
            Date now = Calendar.getInstance().getTime();
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
            earliestEventDate = Integer.parseInt(dateFormat.format(new Date(now.getTime()-30L*AnalyticsConstants.ONE_DAY_IN_MILLISECONDS)));
        }
        System.out.println("Filtering out dates earlier than: " + earliestEventDate);
        JavaRDD<String> logLines = sc.textFile(inputFile);

        // Convert the text log lines to DailyDeviceAggregates objects and cache them
        JavaRDD<DailyDeviceAggregates> accessLogs = logLines.map(Functions.PARSE_DEVICE_AGGREGATE_LINE).filter(new Function<DailyDeviceAggregates, Boolean>() {
            @Override
            public Boolean call(DailyDeviceAggregates value) {
                return (value.getEventdate() >= earliestEventDate);
            }
        }).cache();
        // accessLogs.saveAsTextFile("accessLogs.saved");

        JavaPairRDD<Object, Iterable<DailyDeviceAggregates>> groupMap = accessLogs.groupBy(new Function<DailyDeviceAggregates, Object>() {
            @Override
            public Object call(DailyDeviceAggregates agg) throws Exception {
                return agg.getDevice_id();
            }
        });
        // groupMap.saveAsTextFile("groupedAccessLogs.saved");

        JavaPairRDD<Object, DailyDeviceSummary> deviceCharacteristics = groupMap.mapValues(new Function<Iterable<DailyDeviceAggregates>, DailyDeviceSummary>() {
            @Override
            public DailyDeviceSummary call(Iterable<DailyDeviceAggregates> allDeviceDataForMonth) throws Exception {
                // First task is to sort the input values by eventdate
                ArrayList<DailyDeviceAggregates> arr = new ArrayList<DailyDeviceAggregates>();
                for (DailyDeviceAggregates agg: allDeviceDataForMonth) {
                    arr.add(agg);
                }
                Collections.sort(arr);
                // Done sorting

                double bytesTransferred[] = new double[arr.size()];
                double bytesIn[] = new double[arr.size()];
                double bytesOut[] = new double[arr.size()];

                DailyDeviceAggregates lastAggregate = null;
                int index = 0;
                for (DailyDeviceAggregates aggregate : arr) {
                    // System.out.println(aggregate);
                    bytesIn[index] = aggregate.getBytes_in();
                    bytesOut[index] = aggregate.getBytes_out();
                    bytesTransferred[index] = aggregate.getBytes_transferred();
                    index++;
                    lastAggregate = aggregate;
                }
                BollingerBands bollingerBytesTransferrred = new BollingerBands(bytesTransferred, 30, 2.0);
                BollingerBands bollingerBytesIn = new BollingerBands(bytesIn, 30, 2.0);
                BollingerBands bollingerBytesOut = new BollingerBands(bytesOut, 30, 2.0);

                return new DailyDeviceSummary(lastAggregate.getAccount_id(), lastAggregate.getDevice_id(), index,
                        bollingerBytesIn.getLastMean(), bollingerBytesOut.getLastMean(), bollingerBytesTransferrred.getLastMean(),
                        bollingerBytesIn.getLastStandardDeviation(), bollingerBytesOut.getLastStandardDeviation(), bollingerBytesTransferrred.getLastStandardDeviation(),
                        (long)bollingerBytesIn.getLastUpperThreshold(), (long)bollingerBytesOut.getLastUpperThreshold(), (long)bollingerBytesTransferrred.getLastUpperThreshold(),
                        (long)bollingerBytesIn.getLastLowerThreshold(), (long)bollingerBytesOut.getLastLowerThreshold(), (long)bollingerBytesTransferrred.getLastLowerThreshold());
            }
        });

        deviceCharacteristics.values().saveAsTextFile(outputFile);

Anupam Bagchi


> On Jul 14, 2015, at 10:21 AM, Feynman Liang <fl...@databricks.com> wrote:
> 
> If your rows may have NAs in them, I would process each column individually by first projecting the column ( map(x => x.nameOfColumn) ), filtering out the NAs, then running a summarizer over each column.
> 
> Even if you have many rows, after summarizing you will only have a vector of length #columns.
> 
> On Mon, Jul 13, 2015 at 7:19 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com <ma...@rocketmail.com>> wrote:
> Hello Feynman,
> 
> Actually in my case, the vectors I am summarizing over will not have the same dimension since many devices will be inactive on some days. This is at best a sparse matrix where we take only the active days and attempt to fit a moving average over it.
> 
> The reason I would like to save it to HDFS is that there are really several million (almost a billion) devices for which this data needs to be written. I am perhaps writing a very few columns, but the number of rows is pretty large.
> 
> Given the above two cases, is using MultivariateOnlineSummarizer not a good idea then?
> 
> Anupam Bagchi
> 
> 
>> On Jul 13, 2015, at 7:06 PM, Feynman Liang <fliang@databricks.com <ma...@databricks.com>> wrote:
>> 
>> Dimensions mismatch when adding new sample. Expecting 8 but got 14.
>> 
>> Make sure all the vectors you are summarizing over have the same dimension.
>> 
>> Why would you want to write a MultivariateOnlineSummary object (which can be represented with a couple Double's) into a distributed filesystem like HDFS?
>> 
>> On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com <ma...@rocketmail.com>> wrote:
>> Thank you Feynman for the lead.
>> 
>> I was able to modify the code using clues from the RegressionMetrics example. Here is what I got now.
>> 
>> val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>> 
>> // Calculate statistics based on bytes-transferred
>> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>> println(deviceIdsMap.collect().deep.mkString("\n"))
>> 
>> val summary: MultivariateStatisticalSummary = {
>>   val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
>>     case (deviceId, allaggregates) => Vectors.dense({
>>       val sortedAggregates = allaggregates.toArray
>>       Sorting.quickSort(sortedAggregates)
>>       sortedAggregates.map(dda => dda.bytes.toDouble)
>>     })
>>   }.aggregate(new MultivariateOnlineSummarizer())(
>>       (summary, v) => summary.add(v),  // Not sure if this is really what I want, it just came from the example
>>       (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
>>     )
>>   summary
>> }
>> It compiles fine. But I am now getting an exception as follows at Runtime.
>> 
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost task 1.0 in stage 3.0 (TID 5, localhost): java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch when adding new sample. Expecting 8 but got 14.
>>         at scala.Predef$.require(Predef.scala:233)
>>         at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
>>         at com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>>         at com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>>         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>>         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>         at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
>>         at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
>>         at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
>>         at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
>>         at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>>         at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>>         at org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>>         at org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:722)
>> 
>> Can’t tell where exactly I went wrong. Also, how do I take the MultivariateOnlineSummary object and write it to HDFS? I have the MultivariateOnlineSummary object with me, but I really need an RDD to call saveAsTextFile() on it.
>> 
>> Anupam Bagchi
>> 
>> 
>>> On Jul 13, 2015, at 4:52 PM, Feynman Liang <fliang@databricks.com <ma...@databricks.com>> wrote:
>>> 
>>> A good example is RegressionMetrics <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s use of of OnlineMultivariateSummarizer to aggregate statistics across labels and residuals; take a look at how aggregateByKey is used there.
>>> 
>>> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com <ma...@rocketmail.com>> wrote:
>>> Thank you Feynman for your response. Since I am very new to Scala I may need a bit more hand-holding at this stage.
>>> 
>>> I have been able to incorporate your suggestion about sorting - and it now works perfectly. Thanks again for that.
>>> 
>>> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but could not proceed further. For each deviceid (the key) my goal is to get a vector of doubles on which I can query the mean and standard deviation. Now because RDDs are immutable, I cannot use a foreach loop to interate through the groupby results and individually add the values in an RDD - Spark does not allow that. I need to apply the RDD functions directly on the entire set to achieve the transformations I need. This is where I am faltering since I am not used to the lambda expressions that Scala uses.
>>> 
>>> object DeviceAnalyzer {
>>>   def main(args: Array[String]) {
>>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>>     val sc = new SparkContext(sparkConf)
>>> 
>>>     val logFile = args(0)
>>> 
>>>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>> 
>>>     // Calculate statistics based on bytes
>>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>>     // Question: Can we not write the line above as deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything wrong?
>>>     // All I need to do below is collect the vector of bytes for each device and store it in the RDD
>>>     // The problem with the ‘foreach' approach below, is that it generates the vector values one at a time, which I cannot 
>>>     // add individually to an immutable RDD
>>>     deviceIdsMap.foreach(a => {
>>>       val device_id = a._1  // This is the device ID
>>>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
>>> 
>>>       val sortedaggregates = allaggregates.toArray
>>>       Sorting.quickSort(sortedaggregates)
>>> 
>>>       val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray 
>>>       val count = byteValues.count(A => true)
>>>       val sum = byteValues.sum
>>>       val xbar = sum / count
>>>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>> 
>>>       val vector: Vector = Vectors.dense(byteValues)
>>>       println(vector)
>>>       println(device_id + "," + xbar + "," + stddev)
>>> 
>>>     })
>>>       //val vector: Vector = Vectors.dense(byteValues)
>>>       //println(vector)
>>>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>>> 
>>> 
>>>     sc.stop()
>>>   }
>>> }
>>> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? Thanks a lot for your help.
>>> 
>>> Anupam Bagchi
>>> 
>>> 
>>>> On Jul 13, 2015, at 12:21 PM, Feynman Liang <fliang@databricks.com <ma...@databricks.com>> wrote:
>>>> 
>>>> The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
>>>> allaggregates.toArray allocates and creates a new array separate from allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
>>>> val sortedAggregates = allaggregates.toArray
>>>> Sorting.quickSort(sortedAggregates)
>>>> I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
>>>> MultivariateStatisticalSummary is a trait (similar to a Java interface); you probably want to use MultivariateOnlineSummarizer. 
>>>> For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
>>>> Correct; you would do an aggregate using the add and merge functions provided by MultivariateOnlineSummarizer 
>>>> At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.
>>>> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, or you could unpack the relevant statistics from MultivariateOnlineSummarizer into an array/tuple using a mapValues first and then write.
>>>> 
>>>> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <anupam_bagchi@rocketmail.com <ma...@rocketmail.com>> wrote:
>>>> I have to do the following tasks on a dataset using Apache Spark with Scala as the programming language:
>>>> Read the dataset from HDFS. A few sample lines look like this:
>>>> deviceid,bytes,eventdate
>>>> 15590657,246620,20150630
>>>> 14066921,1907,20150621
>>>> 14066921,1906,20150626
>>>> 6522013,2349,20150626
>>>> 6522013,2525,20150613
>>>> Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate)
>>>> For each device, sort the set by eventdate. We now have an ordered set of bytes based on eventdate for each device.
>>>> Pick the last 30 days of bytes from this ordered set.
>>>> Find the moving average of bytes for the last date using a time period of 30.
>>>> Find the standard deviation of the bytes for the final date using a time period of 30.
>>>> Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3]
>>>> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion rows finally.
>>>> Here is the data structure for the dataset.
>>>> package com.testing
>>>> case class DeviceAggregates (
>>>>                         device_id: Integer,
>>>>                         bytes: Long,
>>>>                         eventdate: Integer
>>>>                    ) extends Ordered[DailyDeviceAggregates] {
>>>>   def compare(that: DailyDeviceAggregates): Int = {
>>>>     eventdate - that.eventdate
>>>>   }
>>>> }
>>>> object DeviceAggregates {
>>>>   def parseLogLine(logline: String): DailyDeviceAggregates = {
>>>>     val c = logline.split(",")
>>>>     DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>>>>   }
>>>> }
>>>> The DeviceAnalyzer class looks like this:
>>>> I have a very crude implementation that does the job, but it is not up to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. Here is what I have now:
>>>> 
>>>> import com.testing.DailyDeviceAggregates
>>>> import org.apache.spark.{SparkContext, SparkConf}
>>>> import org.apache.spark.mllib.linalg.Vector
>>>> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
>>>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>>> 
>>>> import scala.util.Sorting
>>>> 
>>>> object DeviceAnalyzer {
>>>>   def main(args: Array[String]) {
>>>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>>>     val sc = new SparkContext(sparkConf)
>>>> 
>>>>     val logFile = args(0)
>>>> 
>>>>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>>> 
>>>>     // Calculate statistics based on bytes
>>>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>>> 
>>>>     deviceIdsMap.foreach(a => {
>>>>       val device_id = a._1  // This is the device ID
>>>>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
>>>> 
>>>>       println(allaggregates)
>>>>       Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
>>>>       println(allaggregates) // This does not work - results are not sorted !!
>>>> 
>>>>       val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
>>>>       val count = byteValues.count(A => true)
>>>>       val sum = byteValues.sum
>>>>       val xbar = sum / count
>>>>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>>>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>>> 
>>>>       val vector: Vector = Vectors.dense(byteValues)
>>>>       println(vector)
>>>>       println(device_id + "," + xbar + "," + stddev)
>>>> 
>>>>       //val vector: Vector = Vectors.dense(byteValues)
>>>>       //println(vector)
>>>>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>>>>     })
>>>> 
>>>>     sc.stop()
>>>>   }
>>>> }
>>>> I would really appreciate if someone can suggests improvements for the following:
>>>> The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
>>>> I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
>>>> For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
>>>> At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.
>>>> 
>>>> Thanks in advance for your help.
>>>> 
>>>> Anupam Bagchi
>>>>  
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 


Re: Finding moving average using Spark and Scala

Posted by Feynman Liang <fl...@databricks.com>.
If your rows may have NAs in them, I would process each column individually
by first projecting the column ( map(x => x.nameOfColumn) ), filtering out
the NAs, then running a summarizer over each column.

Even if you have many rows, after summarizing you will only have a vector
of length #columns.

On Mon, Jul 13, 2015 at 7:19 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com
> wrote:

> Hello Feynman,
>
> Actually in my case, the vectors I am summarizing over will not have the
> same dimension since many devices will be inactive on some days. This is at
> best a sparse matrix where we take only the active days and attempt to fit
> a moving average over it.
>
> The reason I would like to save it to HDFS is that there are really
> several million (almost a billion) devices for which this data needs to be
> written. I am perhaps writing a very few columns, but the number of rows is
> pretty large.
>
> Given the above two cases, is using MultivariateOnlineSummarizer not a
> good idea then?
>
> Anupam Bagchi
>
>
> On Jul 13, 2015, at 7:06 PM, Feynman Liang <fl...@databricks.com> wrote:
>
> Dimensions mismatch when adding new sample. Expecting 8 but got 14.
>
> Make sure all the vectors you are summarizing over have the same dimension.
>
> Why would you want to write a MultivariateOnlineSummary object (which can
> be represented with a couple Double's) into a distributed filesystem like
> HDFS?
>
> On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi <
> anupam_bagchi@rocketmail.com> wrote:
>
>> Thank you Feynman for the lead.
>>
>> I was able to modify the code using clues from the RegressionMetrics
>> example. Here is what I got now.
>>
>> val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>
>> // Calculate statistics based on bytes-transferred
>> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>> println(deviceIdsMap.collect().deep.mkString("\n"))
>>
>> val summary: MultivariateStatisticalSummary = {
>>   val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
>>     case (deviceId, allaggregates) => Vectors.dense({
>>       val sortedAggregates = allaggregates.toArray
>>       Sorting.quickSort(sortedAggregates)
>>       sortedAggregates.map(dda => dda.bytes.toDouble)
>>     })
>>   }.aggregate(new MultivariateOnlineSummarizer())(
>>       (summary, v) => summary.add(v),  // Not sure if this is really what I want, it just came from the example
>>       (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
>>     )
>>   summary
>> }
>>
>> It compiles fine. But I am now getting an exception as follows at Runtime.
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent
>> failure: Lost task 1.0 in stage 3.0 (TID 5, localhost):
>> java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch
>> when adding new sample. Expecting 8 but got 14.
>>         at scala.Predef$.require(Predef.scala:233)
>>         at
>> org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
>>         at
>> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>>         at
>> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>>         at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>>         at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>         at
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
>>         at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
>>         at
>> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
>>         at
>> scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
>>         at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>>         at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>>         at
>> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>>         at
>> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:722)
>>
>> Can’t tell where exactly I went wrong. Also, how do I take the
>> MultivariateOnlineSummary object and write it to HDFS? I have the
>> MultivariateOnlineSummary object with me, but I really need an RDD to call
>> saveAsTextFile() on it.
>>
>> Anupam Bagchi
>>
>>
>> On Jul 13, 2015, at 4:52 PM, Feynman Liang <fl...@databricks.com> wrote:
>>
>> A good example is RegressionMetrics
>> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s
>> use of of OnlineMultivariateSummarizer to aggregate statistics across
>> labels and residuals; take a look at how aggregateByKey is used there.
>>
>> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <
>> anupam_bagchi@rocketmail.com> wrote:
>>
>>> Thank you Feynman for your response. Since I am very new to Scala I may
>>> need a bit more hand-holding at this stage.
>>>
>>> I have been able to incorporate your suggestion about sorting - and it
>>> now works perfectly. Thanks again for that.
>>>
>>> I tried to use your suggestion of using MultiVariateOnlineSummarizer,
>>> but could not proceed further. For each deviceid (the key) my goal is to
>>> get a vector of doubles on which I can query the mean and standard
>>> deviation. Now because RDDs are immutable, I cannot use a foreach loop to
>>> interate through the groupby results and individually add the values in an
>>> RDD - Spark does not allow that. I need to apply the RDD functions directly
>>> on the entire set to achieve the transformations I need. This is where I am
>>> faltering since I am not used to the lambda expressions that Scala uses.
>>>
>>> object DeviceAnalyzer {
>>>   def main(args: Array[String]) {
>>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>>     val sc = new SparkContext(sparkConf)
>>>
>>>     val logFile = args(0)
>>>
>>>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>>
>>>     // Calculate statistics based on bytes
>>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>>
>>>     // Question: Can we not write the line above as deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything wrong?
>>>
>>>     // All I need to do below is collect the vector of bytes for each device and store it in the RDD
>>>
>>>     // The problem with the ‘foreach' approach below, is that it generates the vector values one at a time, which I cannot
>>>
>>>     // add individually to an immutable RDD
>>>
>>>     deviceIdsMap.foreach(a => {
>>>       val device_id = a._1  // This is the device ID
>>>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
>>>
>>>       val sortedaggregates = allaggregates.toArray      Sorting.quickSort(sortedaggregates)
>>>
>>>       val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray
>>>       val count = byteValues.count(A => true)
>>>       val sum = byteValues.sum
>>>       val xbar = sum / count
>>>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>>
>>>       val vector: Vector = Vectors.dense(byteValues)
>>>       println(vector)
>>>       println(device_id + "," + xbar + "," + stddev)
>>>     })
>>>
>>>       //val vector: Vector = Vectors.dense(byteValues)
>>>       //println(vector)
>>>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>>>
>>>
>>> sc.stop() } }
>>>
>>> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way?
>>> Thanks a lot for your help.
>>>
>>> Anupam Bagchi
>>>
>>>
>>> On Jul 13, 2015, at 12:21 PM, Feynman Liang <fl...@databricks.com>
>>> wrote:
>>>
>>> The call to Sorting.quicksort is not working. Perhaps I am calling it
>>>> the wrong way.
>>>
>>> allaggregates.toArray allocates and creates a new array separate from
>>> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
>>> val sortedAggregates = allaggregates.toArray
>>> Sorting.quickSort(sortedAggregates)
>>>
>>>> I would like to use the Spark mllib class
>>>> MultivariateStatisticalSummary to calculate the statistical values.
>>>
>>> MultivariateStatisticalSummary is a trait (similar to a Java interface);
>>> you probably want to use MultivariateOnlineSummarizer.
>>>
>>>> For that I would need to keep all my intermediate values as RDD so that
>>>> I can directly use the RDD methods to do the job.
>>>
>>> Correct; you would do an aggregate using the add and merge functions
>>> provided by MultivariateOnlineSummarizer
>>>
>>>> At the end I also need to write the results to HDFS for which there is
>>>> a method provided on the RDD class to do so, which is another reason I
>>>> would like to retain everything as RDD.
>>>
>>> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to
>>> HDFS, or you could unpack the relevant statistics from
>>> MultivariateOnlineSummarizer into an array/tuple using a mapValues first
>>> and then write.
>>>
>>> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <
>>> anupam_bagchi@rocketmail.com> wrote:
>>>
>>>> I have to do the following tasks on a dataset using Apache Spark with
>>>> Scala as the programming language:
>>>>
>>>>    1. Read the dataset from HDFS. A few sample lines look like this:
>>>>
>>>>  deviceid,bytes,eventdate15590657,246620,2015063014066921,1907,2015062114066921,1906,201506266522013,2349,201506266522013,2525,20150613
>>>>
>>>>
>>>>    1. Group the data by device id. Thus we now have a map of deviceid
>>>>    => (bytes,eventdate)
>>>>    2. For each device, sort the set by eventdate. We now have an
>>>>    ordered set of bytes based on eventdate for each device.
>>>>    3. Pick the last 30 days of bytes from this ordered set.
>>>>    4. Find the moving average of bytes for the last date using a time
>>>>    period of 30.
>>>>    5. Find the standard deviation of the bytes for the final date
>>>>    using a time period of 30.
>>>>    6. Return two values in the result (mean - k*stddev) and (mean + k*stddev)
>>>>    [Assume k = 3]
>>>>
>>>> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has
>>>> to run on a billion rows finally.
>>>> Here is the data structure for the dataset.
>>>>
>>>> package com.testingcase class DeviceAggregates (
>>>>                         device_id: Integer,
>>>>                         bytes: Long,
>>>>                         eventdate: Integer
>>>>                    ) extends Ordered[DailyDeviceAggregates] {
>>>>   def compare(that: DailyDeviceAggregates): Int = {
>>>>     eventdate - that.eventdate
>>>>   }}object DeviceAggregates {
>>>>   def parseLogLine(logline: String): DailyDeviceAggregates = {
>>>>     val c = logline.split(",")
>>>>     DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>>>>   }}
>>>>
>>>> The DeviceAnalyzer class looks like this:
>>>> I have a very crude implementation that does the job, but it is not up
>>>> to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite
>>>> basic. Here is what I have now:
>>>>
>>>> import com.testing.DailyDeviceAggregatesimport org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.mllib.linalg.Vectorimport org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>>> import scala.util.Sorting
>>>> object DeviceAnalyzer {
>>>>   def main(args: Array[String]) {
>>>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>>>     val sc = new SparkContext(sparkConf)
>>>>
>>>>     val logFile = args(0)
>>>>
>>>>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>>>
>>>>     // Calculate statistics based on bytes
>>>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>>>
>>>>     deviceIdsMap.foreach(a => {
>>>>       val device_id = a._1  // This is the device ID
>>>>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
>>>>
>>>>       println(allaggregates)
>>>>       Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
>>>>       println(allaggregates) // This does not work - results are not sorted !!
>>>>
>>>>       val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray
>>>>       val count = byteValues.count(A => true)
>>>>       val sum = byteValues.sum
>>>>       val xbar = sum / count
>>>>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>>>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>>>
>>>>       val vector: Vector = Vectors.dense(byteValues)
>>>>       println(vector)
>>>>       println(device_id + "," + xbar + "," + stddev)
>>>>
>>>>       //val vector: Vector = Vectors.dense(byteValues)
>>>>       //println(vector)
>>>>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>>>>     })
>>>>
>>>>     sc.stop()
>>>>   }}
>>>>
>>>> I would really appreciate if someone can suggests improvements for the
>>>> following:
>>>>
>>>>    1. The call to Sorting.quicksort is not working. Perhaps I am
>>>>    calling it the wrong way.
>>>>    2. I would like to use the Spark mllib class
>>>>    MultivariateStatisticalSummary to calculate the statistical values.
>>>>    3. For that I would need to keep all my intermediate values as RDD
>>>>    so that I can directly use the RDD methods to do the job.
>>>>    4. At the end I also need to write the results to HDFS for which
>>>>    there is a method provided on the RDD class to do so, which is another
>>>>    reason I would like to retain everything as RDD.
>>>>
>>>>
>>>> Thanks in advance for your help.
>>>>
>>>> Anupam Bagchi
>>>>
>>>>
>>>
>>>
>>>
>>
>>
>
>

Re: Finding moving average using Spark and Scala

Posted by Anupam Bagchi <an...@rocketmail.com>.
Hello Feynman,

Actually in my case, the vectors I am summarizing over will not have the same dimension since many devices will be inactive on some days. This is at best a sparse matrix where we take only the active days and attempt to fit a moving average over it.

The reason I would like to save it to HDFS is that there are really several million (almost a billion) devices for which this data needs to be written. I am perhaps writing a very few columns, but the number of rows is pretty large.

Given the above two cases, is using MultivariateOnlineSummarizer not a good idea then?

Anupam Bagchi


> On Jul 13, 2015, at 7:06 PM, Feynman Liang <fl...@databricks.com> wrote:
> 
> Dimensions mismatch when adding new sample. Expecting 8 but got 14.
> 
> Make sure all the vectors you are summarizing over have the same dimension.
> 
> Why would you want to write a MultivariateOnlineSummary object (which can be represented with a couple Double's) into a distributed filesystem like HDFS?
> 
> On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com <ma...@rocketmail.com>> wrote:
> Thank you Feynman for the lead.
> 
> I was able to modify the code using clues from the RegressionMetrics example. Here is what I got now.
> 
> val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
> 
> // Calculate statistics based on bytes-transferred
> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
> println(deviceIdsMap.collect().deep.mkString("\n"))
> 
> val summary: MultivariateStatisticalSummary = {
>   val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
>     case (deviceId, allaggregates) => Vectors.dense({
>       val sortedAggregates = allaggregates.toArray
>       Sorting.quickSort(sortedAggregates)
>       sortedAggregates.map(dda => dda.bytes.toDouble)
>     })
>   }.aggregate(new MultivariateOnlineSummarizer())(
>       (summary, v) => summary.add(v),  // Not sure if this is really what I want, it just came from the example
>       (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
>     )
>   summary
> }
> It compiles fine. But I am now getting an exception as follows at Runtime.
> 
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost task 1.0 in stage 3.0 (TID 5, localhost): java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch when adding new sample. Expecting 8 but got 14.
>         at scala.Predef$.require(Predef.scala:233)
>         at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
>         at com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>         at com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
>         at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
>         at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
>         at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
>         at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>         at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>         at org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>         at org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:722)
> 
> Can’t tell where exactly I went wrong. Also, how do I take the MultivariateOnlineSummary object and write it to HDFS? I have the MultivariateOnlineSummary object with me, but I really need an RDD to call saveAsTextFile() on it.
> 
> Anupam Bagchi
> 
> 
>> On Jul 13, 2015, at 4:52 PM, Feynman Liang <fliang@databricks.com <ma...@databricks.com>> wrote:
>> 
>> A good example is RegressionMetrics <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s use of of OnlineMultivariateSummarizer to aggregate statistics across labels and residuals; take a look at how aggregateByKey is used there.
>> 
>> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com <ma...@rocketmail.com>> wrote:
>> Thank you Feynman for your response. Since I am very new to Scala I may need a bit more hand-holding at this stage.
>> 
>> I have been able to incorporate your suggestion about sorting - and it now works perfectly. Thanks again for that.
>> 
>> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but could not proceed further. For each deviceid (the key) my goal is to get a vector of doubles on which I can query the mean and standard deviation. Now because RDDs are immutable, I cannot use a foreach loop to interate through the groupby results and individually add the values in an RDD - Spark does not allow that. I need to apply the RDD functions directly on the entire set to achieve the transformations I need. This is where I am faltering since I am not used to the lambda expressions that Scala uses.
>> 
>> object DeviceAnalyzer {
>>   def main(args: Array[String]) {
>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>     val sc = new SparkContext(sparkConf)
>> 
>>     val logFile = args(0)
>> 
>>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>> 
>>     // Calculate statistics based on bytes
>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>     // Question: Can we not write the line above as deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything wrong?
>>     // All I need to do below is collect the vector of bytes for each device and store it in the RDD
>>     // The problem with the ‘foreach' approach below, is that it generates the vector values one at a time, which I cannot 
>>     // add individually to an immutable RDD
>>     deviceIdsMap.foreach(a => {
>>       val device_id = a._1  // This is the device ID
>>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
>> 
>>       val sortedaggregates = allaggregates.toArray
>>       Sorting.quickSort(sortedaggregates)
>> 
>>       val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray 
>>       val count = byteValues.count(A => true)
>>       val sum = byteValues.sum
>>       val xbar = sum / count
>>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>> 
>>       val vector: Vector = Vectors.dense(byteValues)
>>       println(vector)
>>       println(device_id + "," + xbar + "," + stddev)
>> 
>>     })
>>       //val vector: Vector = Vectors.dense(byteValues)
>>       //println(vector)
>>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>> 
>> 
>>     sc.stop()
>>   }
>> }
>> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? Thanks a lot for your help.
>> 
>> Anupam Bagchi
>> 
>> 
>>> On Jul 13, 2015, at 12:21 PM, Feynman Liang <fliang@databricks.com <ma...@databricks.com>> wrote:
>>> 
>>> The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
>>> allaggregates.toArray allocates and creates a new array separate from allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
>>> val sortedAggregates = allaggregates.toArray
>>> Sorting.quickSort(sortedAggregates)
>>> I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
>>> MultivariateStatisticalSummary is a trait (similar to a Java interface); you probably want to use MultivariateOnlineSummarizer. 
>>> For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
>>> Correct; you would do an aggregate using the add and merge functions provided by MultivariateOnlineSummarizer 
>>> At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.
>>> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, or you could unpack the relevant statistics from MultivariateOnlineSummarizer into an array/tuple using a mapValues first and then write.
>>> 
>>> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <anupam_bagchi@rocketmail.com <ma...@rocketmail.com>> wrote:
>>> I have to do the following tasks on a dataset using Apache Spark with Scala as the programming language:
>>> Read the dataset from HDFS. A few sample lines look like this:
>>> deviceid,bytes,eventdate
>>> 15590657,246620,20150630
>>> 14066921,1907,20150621
>>> 14066921,1906,20150626
>>> 6522013,2349,20150626
>>> 6522013,2525,20150613
>>> Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate)
>>> For each device, sort the set by eventdate. We now have an ordered set of bytes based on eventdate for each device.
>>> Pick the last 30 days of bytes from this ordered set.
>>> Find the moving average of bytes for the last date using a time period of 30.
>>> Find the standard deviation of the bytes for the final date using a time period of 30.
>>> Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3]
>>> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion rows finally.
>>> Here is the data structure for the dataset.
>>> package com.testing
>>> case class DeviceAggregates (
>>>                         device_id: Integer,
>>>                         bytes: Long,
>>>                         eventdate: Integer
>>>                    ) extends Ordered[DailyDeviceAggregates] {
>>>   def compare(that: DailyDeviceAggregates): Int = {
>>>     eventdate - that.eventdate
>>>   }
>>> }
>>> object DeviceAggregates {
>>>   def parseLogLine(logline: String): DailyDeviceAggregates = {
>>>     val c = logline.split(",")
>>>     DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>>>   }
>>> }
>>> The DeviceAnalyzer class looks like this:
>>> I have a very crude implementation that does the job, but it is not up to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. Here is what I have now:
>>> 
>>> import com.testing.DailyDeviceAggregates
>>> import org.apache.spark.{SparkContext, SparkConf}
>>> import org.apache.spark.mllib.linalg.Vector
>>> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
>>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>> 
>>> import scala.util.Sorting
>>> 
>>> object DeviceAnalyzer {
>>>   def main(args: Array[String]) {
>>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>>     val sc = new SparkContext(sparkConf)
>>> 
>>>     val logFile = args(0)
>>> 
>>>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>> 
>>>     // Calculate statistics based on bytes
>>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>> 
>>>     deviceIdsMap.foreach(a => {
>>>       val device_id = a._1  // This is the device ID
>>>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
>>> 
>>>       println(allaggregates)
>>>       Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
>>>       println(allaggregates) // This does not work - results are not sorted !!
>>> 
>>>       val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
>>>       val count = byteValues.count(A => true)
>>>       val sum = byteValues.sum
>>>       val xbar = sum / count
>>>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>> 
>>>       val vector: Vector = Vectors.dense(byteValues)
>>>       println(vector)
>>>       println(device_id + "," + xbar + "," + stddev)
>>> 
>>>       //val vector: Vector = Vectors.dense(byteValues)
>>>       //println(vector)
>>>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>>>     })
>>> 
>>>     sc.stop()
>>>   }
>>> }
>>> I would really appreciate if someone can suggests improvements for the following:
>>> The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
>>> I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
>>> For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
>>> At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.
>>> 
>>> Thanks in advance for your help.
>>> 
>>> Anupam Bagchi
>>>  
>>> 
>> 
>> 
> 
> 


Re: Finding moving average using Spark and Scala

Posted by Feynman Liang <fl...@databricks.com>.
Dimensions mismatch when adding new sample. Expecting 8 but got 14.

Make sure all the vectors you are summarizing over have the same dimension.

Why would you want to write a MultivariateOnlineSummary object (which can
be represented with a couple Double's) into a distributed filesystem like
HDFS?

On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com
> wrote:

> Thank you Feynman for the lead.
>
> I was able to modify the code using clues from the RegressionMetrics
> example. Here is what I got now.
>
> val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>
> // Calculate statistics based on bytes-transferred
> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
> println(deviceIdsMap.collect().deep.mkString("\n"))
>
> val summary: MultivariateStatisticalSummary = {
>   val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
>     case (deviceId, allaggregates) => Vectors.dense({
>       val sortedAggregates = allaggregates.toArray
>       Sorting.quickSort(sortedAggregates)
>       sortedAggregates.map(dda => dda.bytes.toDouble)
>     })
>   }.aggregate(new MultivariateOnlineSummarizer())(
>       (summary, v) => summary.add(v),  // Not sure if this is really what I want, it just came from the example
>       (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
>     )
>   summary
> }
>
> It compiles fine. But I am now getting an exception as follows at Runtime.
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent
> failure: Lost task 1.0 in stage 3.0 (TID 5, localhost):
> java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch
> when adding new sample. Expecting 8 but got 14.
>         at scala.Predef$.require(Predef.scala:233)
>         at
> org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
>         at
> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>         at
> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>         at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>         at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
>         at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
>         at
> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
>         at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
>         at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>         at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>         at
> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>         at
> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:722)
>
> Can’t tell where exactly I went wrong. Also, how do I take the
> MultivariateOnlineSummary object and write it to HDFS? I have the
> MultivariateOnlineSummary object with me, but I really need an RDD to call
> saveAsTextFile() on it.
>
> Anupam Bagchi
> (c) 408.431.0780 (h) 408-873-7909
>
> On Jul 13, 2015, at 4:52 PM, Feynman Liang <fl...@databricks.com> wrote:
>
> A good example is RegressionMetrics
> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s
> use of of OnlineMultivariateSummarizer to aggregate statistics across
> labels and residuals; take a look at how aggregateByKey is used there.
>
> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <
> anupam_bagchi@rocketmail.com> wrote:
>
>> Thank you Feynman for your response. Since I am very new to Scala I may
>> need a bit more hand-holding at this stage.
>>
>> I have been able to incorporate your suggestion about sorting - and it
>> now works perfectly. Thanks again for that.
>>
>> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but
>> could not proceed further. For each deviceid (the key) my goal is to get a
>> vector of doubles on which I can query the mean and standard deviation. Now
>> because RDDs are immutable, I cannot use a foreach loop to interate through
>> the groupby results and individually add the values in an RDD - Spark does
>> not allow that. I need to apply the RDD functions directly on the entire
>> set to achieve the transformations I need. This is where I am faltering
>> since I am not used to the lambda expressions that Scala uses.
>>
>> object DeviceAnalyzer {
>>   def main(args: Array[String]) {
>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>     val sc = new SparkContext(sparkConf)
>>
>>     val logFile = args(0)
>>
>>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>
>>     // Calculate statistics based on bytes
>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>
>>     // Question: Can we not write the line above as deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything wrong?
>>
>>     // All I need to do below is collect the vector of bytes for each device and store it in the RDD
>>
>>     // The problem with the ‘foreach' approach below, is that it generates the vector values one at a time, which I cannot
>>
>>     // add individually to an immutable RDD
>>
>>     deviceIdsMap.foreach(a => {
>>       val device_id = a._1  // This is the device ID
>>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
>>
>>       val sortedaggregates = allaggregates.toArray      Sorting.quickSort(sortedaggregates)
>>
>>       val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray
>>       val count = byteValues.count(A => true)
>>       val sum = byteValues.sum
>>       val xbar = sum / count
>>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>
>>       val vector: Vector = Vectors.dense(byteValues)
>>       println(vector)
>>       println(device_id + "," + xbar + "," + stddev)
>>     })
>>
>>       //val vector: Vector = Vectors.dense(byteValues)
>>       //println(vector)
>>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>>
>>
>> sc.stop() } }
>>
>> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way?
>> Thanks a lot for your help.
>>
>> Anupam Bagchi
>>
>>
>> On Jul 13, 2015, at 12:21 PM, Feynman Liang <fl...@databricks.com>
>> wrote:
>>
>> The call to Sorting.quicksort is not working. Perhaps I am calling it the
>>> wrong way.
>>
>> allaggregates.toArray allocates and creates a new array separate from
>> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
>> val sortedAggregates = allaggregates.toArray
>> Sorting.quickSort(sortedAggregates)
>>
>>> I would like to use the Spark mllib class MultivariateStatisticalSummary
>>> to calculate the statistical values.
>>
>> MultivariateStatisticalSummary is a trait (similar to a Java interface);
>> you probably want to use MultivariateOnlineSummarizer.
>>
>>> For that I would need to keep all my intermediate values as RDD so that
>>> I can directly use the RDD methods to do the job.
>>
>> Correct; you would do an aggregate using the add and merge functions
>> provided by MultivariateOnlineSummarizer
>>
>>> At the end I also need to write the results to HDFS for which there is a
>>> method provided on the RDD class to do so, which is another reason I would
>>> like to retain everything as RDD.
>>
>> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS,
>> or you could unpack the relevant statistics from
>> MultivariateOnlineSummarizer into an array/tuple using a mapValues first
>> and then write.
>>
>> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <
>> anupam_bagchi@rocketmail.com> wrote:
>>
>>> I have to do the following tasks on a dataset using Apache Spark with
>>> Scala as the programming language:
>>>
>>>    1. Read the dataset from HDFS. A few sample lines look like this:
>>>
>>>  deviceid,bytes,eventdate15590657,246620,2015063014066921,1907,2015062114066921,1906,201506266522013,2349,201506266522013,2525,20150613
>>>
>>>
>>>    1. Group the data by device id. Thus we now have a map of deviceid
>>>    => (bytes,eventdate)
>>>    2. For each device, sort the set by eventdate. We now have an
>>>    ordered set of bytes based on eventdate for each device.
>>>    3. Pick the last 30 days of bytes from this ordered set.
>>>    4. Find the moving average of bytes for the last date using a time
>>>    period of 30.
>>>    5. Find the standard deviation of the bytes for the final date using
>>>    a time period of 30.
>>>    6. Return two values in the result (mean - k*stddev) and (mean + k*stddev)
>>>    [Assume k = 3]
>>>
>>> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has
>>> to run on a billion rows finally.
>>> Here is the data structure for the dataset.
>>>
>>> package com.testingcase class DeviceAggregates (
>>>                         device_id: Integer,
>>>                         bytes: Long,
>>>                         eventdate: Integer
>>>                    ) extends Ordered[DailyDeviceAggregates] {
>>>   def compare(that: DailyDeviceAggregates): Int = {
>>>     eventdate - that.eventdate
>>>   }}object DeviceAggregates {
>>>   def parseLogLine(logline: String): DailyDeviceAggregates = {
>>>     val c = logline.split(",")
>>>     DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>>>   }}
>>>
>>> The DeviceAnalyzer class looks like this:
>>> I have a very crude implementation that does the job, but it is not up
>>> to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite
>>> basic. Here is what I have now:
>>>
>>> import com.testing.DailyDeviceAggregatesimport org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.mllib.linalg.Vectorimport org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>> import scala.util.Sorting
>>> object DeviceAnalyzer {
>>>   def main(args: Array[String]) {
>>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>>     val sc = new SparkContext(sparkConf)
>>>
>>>     val logFile = args(0)
>>>
>>>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>>
>>>     // Calculate statistics based on bytes
>>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>>
>>>     deviceIdsMap.foreach(a => {
>>>       val device_id = a._1  // This is the device ID
>>>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
>>>
>>>       println(allaggregates)
>>>       Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
>>>       println(allaggregates) // This does not work - results are not sorted !!
>>>
>>>       val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray
>>>       val count = byteValues.count(A => true)
>>>       val sum = byteValues.sum
>>>       val xbar = sum / count
>>>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>>
>>>       val vector: Vector = Vectors.dense(byteValues)
>>>       println(vector)
>>>       println(device_id + "," + xbar + "," + stddev)
>>>
>>>       //val vector: Vector = Vectors.dense(byteValues)
>>>       //println(vector)
>>>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>>>     })
>>>
>>>     sc.stop()
>>>   }}
>>>
>>> I would really appreciate if someone can suggests improvements for the
>>> following:
>>>
>>>    1. The call to Sorting.quicksort is not working. Perhaps I am
>>>    calling it the wrong way.
>>>    2. I would like to use the Spark mllib class
>>>    MultivariateStatisticalSummary to calculate the statistical values.
>>>    3. For that I would need to keep all my intermediate values as RDD
>>>    so that I can directly use the RDD methods to do the job.
>>>    4. At the end I also need to write the results to HDFS for which
>>>    there is a method provided on the RDD class to do so, which is another
>>>    reason I would like to retain everything as RDD.
>>>
>>>
>>> Thanks in advance for your help.
>>>
>>> Anupam Bagchi
>>>
>>>
>>
>>
>>
>
>

Re: Finding moving average using Spark and Scala

Posted by Anupam Bagchi <an...@rocketmail.com>.
Thank you Feynman for the lead.

I was able to modify the code using clues from the RegressionMetrics example. Here is what I got now.

val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

// Calculate statistics based on bytes-transferred
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
println(deviceIdsMap.collect().deep.mkString("\n"))

val summary: MultivariateStatisticalSummary = {
  val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
    case (deviceId, allaggregates) => Vectors.dense({
      val sortedAggregates = allaggregates.toArray
      Sorting.quickSort(sortedAggregates)
      sortedAggregates.map(dda => dda.bytes.toDouble)
    })
  }.aggregate(new MultivariateOnlineSummarizer())(
      (summary, v) => summary.add(v),  // Not sure if this is really what I want, it just came from the example
      (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
    )
  summary
}
It compiles fine. But I am now getting an exception as follows at Runtime.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost task 1.0 in stage 3.0 (TID 5, localhost): java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch when adding new sample. Expecting 8 but got 14.
        at scala.Predef$.require(Predef.scala:233)
        at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
        at com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
        at com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
        at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
        at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
        at org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
        at org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)

Can’t tell where exactly I went wrong. Also, how do I take the MultivariateOnlineSummary object and write it to HDFS? I have the MultivariateOnlineSummary object with me, but I really need an RDD to call saveAsTextFile() on it.

Anupam Bagchi
(c) 408.431.0780 (h) 408-873-7909

> On Jul 13, 2015, at 4:52 PM, Feynman Liang <fl...@databricks.com> wrote:
> 
> A good example is RegressionMetrics <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s use of of OnlineMultivariateSummarizer to aggregate statistics across labels and residuals; take a look at how aggregateByKey is used there.
> 
> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com <ma...@rocketmail.com>> wrote:
> Thank you Feynman for your response. Since I am very new to Scala I may need a bit more hand-holding at this stage.
> 
> I have been able to incorporate your suggestion about sorting - and it now works perfectly. Thanks again for that.
> 
> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but could not proceed further. For each deviceid (the key) my goal is to get a vector of doubles on which I can query the mean and standard deviation. Now because RDDs are immutable, I cannot use a foreach loop to interate through the groupby results and individually add the values in an RDD - Spark does not allow that. I need to apply the RDD functions directly on the entire set to achieve the transformations I need. This is where I am faltering since I am not used to the lambda expressions that Scala uses.
> 
> object DeviceAnalyzer {
>   def main(args: Array[String]) {
>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>     val sc = new SparkContext(sparkConf)
> 
>     val logFile = args(0)
> 
>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
> 
>     // Calculate statistics based on bytes
>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>     // Question: Can we not write the line above as deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything wrong?
>     // All I need to do below is collect the vector of bytes for each device and store it in the RDD
>     // The problem with the ‘foreach' approach below, is that it generates the vector values one at a time, which I cannot 
>     // add individually to an immutable RDD
>     deviceIdsMap.foreach(a => {
>       val device_id = a._1  // This is the device ID
>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
> 
>       val sortedaggregates = allaggregates.toArray
>       Sorting.quickSort(sortedaggregates)
> 
>       val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray 
>       val count = byteValues.count(A => true)
>       val sum = byteValues.sum
>       val xbar = sum / count
>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
> 
>       val vector: Vector = Vectors.dense(byteValues)
>       println(vector)
>       println(device_id + "," + xbar + "," + stddev)
> 
>     })
>       //val vector: Vector = Vectors.dense(byteValues)
>       //println(vector)
>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
> 
> 
>     sc.stop()
>   }
> }
> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? Thanks a lot for your help.
> 
> Anupam Bagchi
> 
> 
>> On Jul 13, 2015, at 12:21 PM, Feynman Liang <fliang@databricks.com <ma...@databricks.com>> wrote:
>> 
>> The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
>> allaggregates.toArray allocates and creates a new array separate from allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
>> val sortedAggregates = allaggregates.toArray
>> Sorting.quickSort(sortedAggregates)
>> I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
>> MultivariateStatisticalSummary is a trait (similar to a Java interface); you probably want to use MultivariateOnlineSummarizer. 
>> For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
>> Correct; you would do an aggregate using the add and merge functions provided by MultivariateOnlineSummarizer 
>> At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.
>> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, or you could unpack the relevant statistics from MultivariateOnlineSummarizer into an array/tuple using a mapValues first and then write.
>> 
>> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <anupam_bagchi@rocketmail.com <ma...@rocketmail.com>> wrote:
>> I have to do the following tasks on a dataset using Apache Spark with Scala as the programming language:
>> Read the dataset from HDFS. A few sample lines look like this:
>> deviceid,bytes,eventdate
>> 15590657,246620,20150630
>> 14066921,1907,20150621
>> 14066921,1906,20150626
>> 6522013,2349,20150626
>> 6522013,2525,20150613
>> Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate)
>> For each device, sort the set by eventdate. We now have an ordered set of bytes based on eventdate for each device.
>> Pick the last 30 days of bytes from this ordered set.
>> Find the moving average of bytes for the last date using a time period of 30.
>> Find the standard deviation of the bytes for the final date using a time period of 30.
>> Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3]
>> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion rows finally.
>> Here is the data structure for the dataset.
>> package com.testing
>> case class DeviceAggregates (
>>                         device_id: Integer,
>>                         bytes: Long,
>>                         eventdate: Integer
>>                    ) extends Ordered[DailyDeviceAggregates] {
>>   def compare(that: DailyDeviceAggregates): Int = {
>>     eventdate - that.eventdate
>>   }
>> }
>> object DeviceAggregates {
>>   def parseLogLine(logline: String): DailyDeviceAggregates = {
>>     val c = logline.split(",")
>>     DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>>   }
>> }
>> The DeviceAnalyzer class looks like this:
>> I have a very crude implementation that does the job, but it is not up to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. Here is what I have now:
>> 
>> import com.testing.DailyDeviceAggregates
>> import org.apache.spark.{SparkContext, SparkConf}
>> import org.apache.spark.mllib.linalg.Vector
>> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
>> import org.apache.spark.mllib.linalg.{Vector, Vectors}
>> 
>> import scala.util.Sorting
>> 
>> object DeviceAnalyzer {
>>   def main(args: Array[String]) {
>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>     val sc = new SparkContext(sparkConf)
>> 
>>     val logFile = args(0)
>> 
>>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>> 
>>     // Calculate statistics based on bytes
>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>> 
>>     deviceIdsMap.foreach(a => {
>>       val device_id = a._1  // This is the device ID
>>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
>> 
>>       println(allaggregates)
>>       Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
>>       println(allaggregates) // This does not work - results are not sorted !!
>> 
>>       val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
>>       val count = byteValues.count(A => true)
>>       val sum = byteValues.sum
>>       val xbar = sum / count
>>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>> 
>>       val vector: Vector = Vectors.dense(byteValues)
>>       println(vector)
>>       println(device_id + "," + xbar + "," + stddev)
>> 
>>       //val vector: Vector = Vectors.dense(byteValues)
>>       //println(vector)
>>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>>     })
>> 
>>     sc.stop()
>>   }
>> }
>> I would really appreciate if someone can suggests improvements for the following:
>> The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
>> I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
>> For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
>> At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.
>> 
>> Thanks in advance for your help.
>> 
>> Anupam Bagchi
>>  
>> 
> 
> 


Re: Finding moving average using Spark and Scala

Posted by Feynman Liang <fl...@databricks.com>.
A good example is RegressionMetrics
<https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s
use of of OnlineMultivariateSummarizer to aggregate statistics across
labels and residuals; take a look at how aggregateByKey is used there.

On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com
> wrote:

> Thank you Feynman for your response. Since I am very new to Scala I may
> need a bit more hand-holding at this stage.
>
> I have been able to incorporate your suggestion about sorting - and it now
> works perfectly. Thanks again for that.
>
> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but
> could not proceed further. For each deviceid (the key) my goal is to get a
> vector of doubles on which I can query the mean and standard deviation. Now
> because RDDs are immutable, I cannot use a foreach loop to interate through
> the groupby results and individually add the values in an RDD - Spark does
> not allow that. I need to apply the RDD functions directly on the entire
> set to achieve the transformations I need. This is where I am faltering
> since I am not used to the lambda expressions that Scala uses.
>
> object DeviceAnalyzer {
>   def main(args: Array[String]) {
>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>     val sc = new SparkContext(sparkConf)
>
>     val logFile = args(0)
>
>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>
>     // Calculate statistics based on bytes
>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>
>     // Question: Can we not write the line above as deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything wrong?
>
>     // All I need to do below is collect the vector of bytes for each device and store it in the RDD
>
>     // The problem with the ‘foreach' approach below, is that it generates the vector values one at a time, which I cannot
>
>     // add individually to an immutable RDD
>
>     deviceIdsMap.foreach(a => {
>       val device_id = a._1  // This is the device ID
>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
>
>       val sortedaggregates = allaggregates.toArray      Sorting.quickSort(sortedaggregates)
>
>       val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray
>       val count = byteValues.count(A => true)
>       val sum = byteValues.sum
>       val xbar = sum / count
>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>
>       val vector: Vector = Vectors.dense(byteValues)
>       println(vector)
>       println(device_id + "," + xbar + "," + stddev)
>     })
>
>       //val vector: Vector = Vectors.dense(byteValues)
>       //println(vector)
>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>
>
> sc.stop() } }
>
> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way?
> Thanks a lot for your help.
>
> Anupam Bagchi
>
>
> On Jul 13, 2015, at 12:21 PM, Feynman Liang <fl...@databricks.com> wrote:
>
> The call to Sorting.quicksort is not working. Perhaps I am calling it the
>> wrong way.
>
> allaggregates.toArray allocates and creates a new array separate from
> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
> val sortedAggregates = allaggregates.toArray
> Sorting.quickSort(sortedAggregates)
>
>> I would like to use the Spark mllib class MultivariateStatisticalSummary
>> to calculate the statistical values.
>
> MultivariateStatisticalSummary is a trait (similar to a Java interface);
> you probably want to use MultivariateOnlineSummarizer.
>
>> For that I would need to keep all my intermediate values as RDD so that I
>> can directly use the RDD methods to do the job.
>
> Correct; you would do an aggregate using the add and merge functions
> provided by MultivariateOnlineSummarizer
>
>> At the end I also need to write the results to HDFS for which there is a
>> method provided on the RDD class to do so, which is another reason I would
>> like to retain everything as RDD.
>
> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS,
> or you could unpack the relevant statistics from
> MultivariateOnlineSummarizer into an array/tuple using a mapValues first
> and then write.
>
> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <
> anupam_bagchi@rocketmail.com> wrote:
>
>> I have to do the following tasks on a dataset using Apache Spark with
>> Scala as the programming language:
>>
>>    1. Read the dataset from HDFS. A few sample lines look like this:
>>
>>  deviceid,bytes,eventdate15590657,246620,2015063014066921,1907,2015062114066921,1906,201506266522013,2349,201506266522013,2525,20150613
>>
>>
>>    1. Group the data by device id. Thus we now have a map of deviceid =>
>>    (bytes,eventdate)
>>    2. For each device, sort the set by eventdate. We now have an ordered
>>    set of bytes based on eventdate for each device.
>>    3. Pick the last 30 days of bytes from this ordered set.
>>    4. Find the moving average of bytes for the last date using a time
>>    period of 30.
>>    5. Find the standard deviation of the bytes for the final date using
>>    a time period of 30.
>>    6. Return two values in the result (mean - k*stddev) and (mean + k*stddev)
>>    [Assume k = 3]
>>
>> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to
>> run on a billion rows finally.
>> Here is the data structure for the dataset.
>>
>> package com.testingcase class DeviceAggregates (
>>                         device_id: Integer,
>>                         bytes: Long,
>>                         eventdate: Integer
>>                    ) extends Ordered[DailyDeviceAggregates] {
>>   def compare(that: DailyDeviceAggregates): Int = {
>>     eventdate - that.eventdate
>>   }}object DeviceAggregates {
>>   def parseLogLine(logline: String): DailyDeviceAggregates = {
>>     val c = logline.split(",")
>>     DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>>   }}
>>
>> The DeviceAnalyzer class looks like this:
>> I have a very crude implementation that does the job, but it is not up to
>> the mark. Sorry, I am very new to Scala/Spark, so my questions are quite
>> basic. Here is what I have now:
>>
>> import com.testing.DailyDeviceAggregatesimport org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.mllib.linalg.Vectorimport org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}import org.apache.spark.mllib.linalg.{Vector, Vectors}
>> import scala.util.Sorting
>> object DeviceAnalyzer {
>>   def main(args: Array[String]) {
>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>     val sc = new SparkContext(sparkConf)
>>
>>     val logFile = args(0)
>>
>>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>
>>     // Calculate statistics based on bytes
>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>
>>     deviceIdsMap.foreach(a => {
>>       val device_id = a._1  // This is the device ID
>>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
>>
>>       println(allaggregates)
>>       Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
>>       println(allaggregates) // This does not work - results are not sorted !!
>>
>>       val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray
>>       val count = byteValues.count(A => true)
>>       val sum = byteValues.sum
>>       val xbar = sum / count
>>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>
>>       val vector: Vector = Vectors.dense(byteValues)
>>       println(vector)
>>       println(device_id + "," + xbar + "," + stddev)
>>
>>       //val vector: Vector = Vectors.dense(byteValues)
>>       //println(vector)
>>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>>     })
>>
>>     sc.stop()
>>   }}
>>
>> I would really appreciate if someone can suggests improvements for the
>> following:
>>
>>    1. The call to Sorting.quicksort is not working. Perhaps I am calling
>>    it the wrong way.
>>    2. I would like to use the Spark mllib class
>>    MultivariateStatisticalSummary to calculate the statistical values.
>>    3. For that I would need to keep all my intermediate values as RDD so
>>    that I can directly use the RDD methods to do the job.
>>    4. At the end I also need to write the results to HDFS for which
>>    there is a method provided on the RDD class to do so, which is another
>>    reason I would like to retain everything as RDD.
>>
>>
>> Thanks in advance for your help.
>>
>> Anupam Bagchi
>>
>>
>
>
>

Re: Finding moving average using Spark and Scala

Posted by Anupam Bagchi <an...@rocketmail.com>.
Thank you Feynman for your response. Since I am very new to Scala I may need a bit more hand-holding at this stage.

I have been able to incorporate your suggestion about sorting - and it now works perfectly. Thanks again for that.

I tried to use your suggestion of using MultiVariateOnlineSummarizer, but could not proceed further. For each deviceid (the key) my goal is to get a vector of doubles on which I can query the mean and standard deviation. Now because RDDs are immutable, I cannot use a foreach loop to interate through the groupby results and individually add the values in an RDD - Spark does not allow that. I need to apply the RDD functions directly on the entire set to achieve the transformations I need. This is where I am faltering since I am not used to the lambda expressions that Scala uses.

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Analyzer")
    val sc = new SparkContext(sparkConf)

    val logFile = args(0)

    val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

    // Calculate statistics based on bytes
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
    // Question: Can we not write the line above as deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything wrong?
    // All I need to do below is collect the vector of bytes for each device and store it in the RDD
    // The problem with the ‘foreach' approach below, is that it generates the vector values one at a time, which I cannot 
    // add individually to an immutable RDD
    deviceIdsMap.foreach(a => {
      val device_id = a._1  // This is the device ID
      val allaggregates = a._2  // This is an array of all device-aggregates for this device

      val sortedaggregates = allaggregates.toArray
      Sorting.quickSort(sortedaggregates)

      val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray 
      val count = byteValues.count(A => true)
      val sum = byteValues.sum
      val xbar = sum / count
      val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
      val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

      val vector: Vector = Vectors.dense(byteValues)
      println(vector)
      println(device_id + "," + xbar + "," + stddev)

    })
      //val vector: Vector = Vectors.dense(byteValues)
      //println(vector)
      //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)


    sc.stop()
  }
}
Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? Thanks a lot for your help.

Anupam Bagchi


> On Jul 13, 2015, at 12:21 PM, Feynman Liang <fl...@databricks.com> wrote:
> 
> The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
> allaggregates.toArray allocates and creates a new array separate from allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
> val sortedAggregates = allaggregates.toArray
> Sorting.quickSort(sortedAggregates)
> I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
> MultivariateStatisticalSummary is a trait (similar to a Java interface); you probably want to use MultivariateOnlineSummarizer. 
> For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
> Correct; you would do an aggregate using the add and merge functions provided by MultivariateOnlineSummarizer 
> At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.
> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, or you could unpack the relevant statistics from MultivariateOnlineSummarizer into an array/tuple using a mapValues first and then write.
> 
> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <anupam_bagchi@rocketmail.com <ma...@rocketmail.com>> wrote:
> I have to do the following tasks on a dataset using Apache Spark with Scala as the programming language:
> Read the dataset from HDFS. A few sample lines look like this:
> deviceid,bytes,eventdate
> 15590657,246620,20150630
> 14066921,1907,20150621
> 14066921,1906,20150626
> 6522013,2349,20150626
> 6522013,2525,20150613
> Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate)
> For each device, sort the set by eventdate. We now have an ordered set of bytes based on eventdate for each device.
> Pick the last 30 days of bytes from this ordered set.
> Find the moving average of bytes for the last date using a time period of 30.
> Find the standard deviation of the bytes for the final date using a time period of 30.
> Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3]
> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion rows finally.
> Here is the data structure for the dataset.
> package com.testing
> case class DeviceAggregates (
>                         device_id: Integer,
>                         bytes: Long,
>                         eventdate: Integer
>                    ) extends Ordered[DailyDeviceAggregates] {
>   def compare(that: DailyDeviceAggregates): Int = {
>     eventdate - that.eventdate
>   }
> }
> object DeviceAggregates {
>   def parseLogLine(logline: String): DailyDeviceAggregates = {
>     val c = logline.split(",")
>     DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>   }
> }
> The DeviceAnalyzer class looks like this:
> I have a very crude implementation that does the job, but it is not up to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. Here is what I have now:
> 
> import com.testing.DailyDeviceAggregates
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.mllib.linalg.Vector
> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
> import org.apache.spark.mllib.linalg.{Vector, Vectors}
> 
> import scala.util.Sorting
> 
> object DeviceAnalyzer {
>   def main(args: Array[String]) {
>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>     val sc = new SparkContext(sparkConf)
> 
>     val logFile = args(0)
> 
>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
> 
>     // Calculate statistics based on bytes
>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
> 
>     deviceIdsMap.foreach(a => {
>       val device_id = a._1  // This is the device ID
>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
> 
>       println(allaggregates)
>       Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
>       println(allaggregates) // This does not work - results are not sorted !!
> 
>       val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
>       val count = byteValues.count(A => true)
>       val sum = byteValues.sum
>       val xbar = sum / count
>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
> 
>       val vector: Vector = Vectors.dense(byteValues)
>       println(vector)
>       println(device_id + "," + xbar + "," + stddev)
> 
>       //val vector: Vector = Vectors.dense(byteValues)
>       //println(vector)
>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>     })
> 
>     sc.stop()
>   }
> }
> I would really appreciate if someone can suggests improvements for the following:
> The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
> I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
> For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
> At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.
> 
> Thanks in advance for your help.
> 
> Anupam Bagchi
>  
> 


Re: Finding moving average using Spark and Scala

Posted by Feynman Liang <fl...@databricks.com>.
>
> The call to Sorting.quicksort is not working. Perhaps I am calling it the
> wrong way.

allaggregates.toArray allocates and creates a new array separate from
allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
val sortedAggregates = allaggregates.toArray
Sorting.quickSort(sortedAggregates)

> I would like to use the Spark mllib class MultivariateStatisticalSummary
> to calculate the statistical values.

MultivariateStatisticalSummary is a trait (similar to a Java interface);
you probably want to use MultivariateOnlineSummarizer.

> For that I would need to keep all my intermediate values as RDD so that I
> can directly use the RDD methods to do the job.

Correct; you would do an aggregate using the add and merge functions
provided by MultivariateOnlineSummarizer

> At the end I also need to write the results to HDFS for which there is a
> method provided on the RDD class to do so, which is another reason I would
> like to retain everything as RDD.

You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS,
or you could unpack the relevant statistics from
MultivariateOnlineSummarizer into an array/tuple using a mapValues first
and then write.

On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <
anupam_bagchi@rocketmail.com> wrote:

> I have to do the following tasks on a dataset using Apache Spark with
> Scala as the programming language:
>
>    1. Read the dataset from HDFS. A few sample lines look like this:
>
>  deviceid,bytes,eventdate15590657,246620,2015063014066921,1907,2015062114066921,1906,201506266522013,2349,201506266522013,2525,20150613
>
>
>    1. Group the data by device id. Thus we now have a map of deviceid =>
>    (bytes,eventdate)
>    2. For each device, sort the set by eventdate. We now have an ordered
>    set of bytes based on eventdate for each device.
>    3. Pick the last 30 days of bytes from this ordered set.
>    4. Find the moving average of bytes for the last date using a time
>    period of 30.
>    5. Find the standard deviation of the bytes for the final date using a
>    time period of 30.
>    6. Return two values in the result (mean - k*stddev) and (mean + k*stddev)
>    [Assume k = 3]
>
> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to
> run on a billion rows finally.
> Here is the data structure for the dataset.
>
> package com.testingcase class DeviceAggregates (
>                         device_id: Integer,
>                         bytes: Long,
>                         eventdate: Integer
>                    ) extends Ordered[DailyDeviceAggregates] {
>   def compare(that: DailyDeviceAggregates): Int = {
>     eventdate - that.eventdate
>   }}object DeviceAggregates {
>   def parseLogLine(logline: String): DailyDeviceAggregates = {
>     val c = logline.split(",")
>     DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>   }}
>
> The DeviceAnalyzer class looks like this:
> I have a very crude implementation that does the job, but it is not up to
> the mark. Sorry, I am very new to Scala/Spark, so my questions are quite
> basic. Here is what I have now:
>
> import com.testing.DailyDeviceAggregatesimport org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.mllib.linalg.Vectorimport org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}import org.apache.spark.mllib.linalg.{Vector, Vectors}
> import scala.util.Sorting
> object DeviceAnalyzer {
>   def main(args: Array[String]) {
>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>     val sc = new SparkContext(sparkConf)
>
>     val logFile = args(0)
>
>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>
>     // Calculate statistics based on bytes
>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>
>     deviceIdsMap.foreach(a => {
>       val device_id = a._1  // This is the device ID
>       val allaggregates = a._2  // This is an array of all device-aggregates for this device
>
>       println(allaggregates)
>       Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
>       println(allaggregates) // This does not work - results are not sorted !!
>
>       val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray
>       val count = byteValues.count(A => true)
>       val sum = byteValues.sum
>       val xbar = sum / count
>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>
>       val vector: Vector = Vectors.dense(byteValues)
>       println(vector)
>       println(device_id + "," + xbar + "," + stddev)
>
>       //val vector: Vector = Vectors.dense(byteValues)
>       //println(vector)
>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>     })
>
>     sc.stop()
>   }}
>
> I would really appreciate if someone can suggests improvements for the
> following:
>
>    1. The call to Sorting.quicksort is not working. Perhaps I am calling
>    it the wrong way.
>    2. I would like to use the Spark mllib class
>    MultivariateStatisticalSummary to calculate the statistical values.
>    3. For that I would need to keep all my intermediate values as RDD so
>    that I can directly use the RDD methods to do the job.
>    4. At the end I also need to write the results to HDFS for which there
>    is a method provided on the RDD class to do so, which is another reason I
>    would like to retain everything as RDD.
>
>
> Thanks in advance for your help.
>
> Anupam Bagchi
>
>