You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "PAULI, KEVIN CHRISTIAN [AG-Contractor/1000]" <ke...@monsanto.com> on 2015/07/17 18:21:22 UTC

streaming and piping to R, sending all data in window to pipe()

Spark newbie here, using Spark 1.3.1.

I’m consuming a stream and trying to pipe the data from the entire window to R for analysis.  The R algorithm needs the entire dataset from the stream (everything in the window) in order to function properly; it can’t be broken up.

So I tried doing a coalesce(1) before calling pipe(), but it still seems to be breaking up the data and invoking R, but it still seems to to be breaking up the data and invoking R multiple times with small pieces of data.  Is there some other approach I should try?

Here’s a small snippet:

    val inputs: DStream[String] = MQTTUtils.createStream(ssc, mqttBrokerUrl, inputsTopic, StorageLevel.MEMORY_AND_DISK_SER)
      .window(duration)
    inputs.foreachRDD {
      windowRdd => {
        if (windowRdd.count() > 0) processWindow(windowRdd)
      }
    }

...

  def processWindow(windowRdd: RDD[String]) = {
    // call R script to process data
    windowRdd.coalesce(1)
    val outputsRdd: RDD[String] = windowRdd.pipe(SparkFiles.get(Paths.get(rScript).getFileName.toString))
    outputsRdd.cache()

    if (outputsRdd.count() > 0) processOutputs(outputsRdd)
  }

...

This e-mail message may contain privileged and/or confidential information, and is intended to be received only by persons entitled
to receive such information. If you have received this e-mail in error, please notify the sender immediately. Please delete it and
all attachments from any servers, hard drives or any other media. Other use of this e-mail by you is strictly prohibited.

All e-mails and attachments sent and received are subject to monitoring, reading and archival by Monsanto, including its
subsidiaries. The recipient of this e-mail is solely responsible for checking for the presence of "Viruses" or other "Malware".
Monsanto, along with its subsidiaries, accepts no liability for any damage caused by any such code transmitted by or accompanying
this e-mail or any attachment.


The information contained in this email may be subject to the export control laws and regulations of the United States, potentially
including but not limited to the Export Administration Regulations (EAR) and sanctions regulations issued by the U.S. Department of
Treasury, Office of Foreign Asset Controls (OFAC).  As a recipient of this information you are obligated to comply with all
applicable U.S. export laws and regulations.

Re: streaming and piping to R, sending all data in window to pipe()

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Did you try inputs.repartition(1).foreachRDD(..)?

Thanks
Best Regards

On Fri, Jul 17, 2015 at 9:51 PM, PAULI, KEVIN CHRISTIAN
[AG-Contractor/1000] <ke...@monsanto.com> wrote:

>  Spark newbie here, using Spark 1.3.1.
>
>  I’m consuming a stream and trying to pipe the data from the entire
> window to R for analysis.  The R algorithm needs the entire dataset from
> the stream (everything in the window) in order to function properly; it
> can’t be broken up.
>
>  So I tried doing a coalesce(1) before calling pipe(), but it still seems
> to be breaking up the data and invoking R, but it still seems to to be
> breaking up the data and invoking R multiple times with small pieces of
> data.  Is there some other approach I should try?
>
>  Here’s a small snippet:
>
>      val inputs: DStream[String] = MQTTUtils.createStream(ssc,
> mqttBrokerUrl, inputsTopic, StorageLevel.MEMORY_AND_DISK_SER)
>       .window(duration)
>     inputs.foreachRDD {
>       windowRdd => {
>         if (windowRdd.count() > 0) processWindow(windowRdd)
>       }
>     }
>
>  ...
>
>    def processWindow(windowRdd: RDD[String]) = {
>     // call R script to process data
>     windowRdd.coalesce(1)
>     val outputsRdd: RDD[String] =
> windowRdd.pipe(SparkFiles.get(Paths.get(rScript).getFileName.toString))
>     outputsRdd.cache()
>
>      if (outputsRdd.count() > 0) processOutputs(outputsRdd)
>   }
>
>  ...
>
>
>
> This e-mail message may contain privileged and/or confidential information, and is intended to be received only by persons entitled
> to receive such information. If you have received this e-mail in error, please notify the sender immediately. Please delete it and
> all attachments from any servers, hard drives or any other media. Other use of this e-mail by you is strictly prohibited.
>
> All e-mails and attachments sent and received are subject to monitoring, reading and archival by Monsanto, including its
> subsidiaries. The recipient of this e-mail is solely responsible for checking for the presence of "Viruses" or other "Malware".
> Monsanto, along with its subsidiaries, accepts no liability for any damage caused by any such code transmitted by or accompanying
> this e-mail or any attachment.
>
>
> The information contained in this email may be subject to the export control laws and regulations of the United States, potentially
> including but not limited to the Export Administration Regulations (EAR) and sanctions regulations issued by the U.S. Department of
> Treasury, Office of Foreign Asset Controls (OFAC).  As a recipient of this information you are obligated to comply with all
> applicable U.S. export laws and regulations.
>
>
>
>