You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anfernee Xu <an...@gmail.com> on 2015/10/26 17:16:34 UTC

Spark Streaming: how to use StreamingContext.queueStream with existing RDD

Hi,

Here's my situation, I have some kind of offline dataset and got them
loaded them into Spark as RDD, but I want to form a virtual data stream
feeding to Spark Streaming, my code looks like this


   // sort offline data by time, the dataset spans 2 hours
 1)  JavaRDD sortedByTime = offlineDataRDD.sortBy( );

   // compute a list of JavaRDD,  each element JavaRDD is hosting the data
in the same time
   // bucket, for example 5 minutes
  2) List<JavaRDD> virtualStreamRdd = ?

    Queue<JavaRDD<Row>> queue = Queues.newLinkedBlockingQueue();
    queue.addAll(virtualStreamRdd);

    /*
     * Create DStream from the queue
     */

    3) final JavaDStream<Row> rowDStream =
streamingContext.queueStream(queue);


Currently I'm stucking in 2), any suggestion is appreciated.

Thanks

-- 
--Anfernee

Re: Spark Streaming: how to use StreamingContext.queueStream with existing RDD

Posted by Dean Wampler <de...@gmail.com>.
Check out StreamingContext.queueStream (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext
)

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Mon, Oct 26, 2015 at 11:16 AM, Anfernee Xu <an...@gmail.com> wrote:

> Hi,
>
> Here's my situation, I have some kind of offline dataset and got them
> loaded them into Spark as RDD, but I want to form a virtual data stream
> feeding to Spark Streaming, my code looks like this
>
>
>    // sort offline data by time, the dataset spans 2 hours
>  1)  JavaRDD sortedByTime = offlineDataRDD.sortBy( );
>
>    // compute a list of JavaRDD,  each element JavaRDD is hosting the data
> in the same time
>    // bucket, for example 5 minutes
>   2) List<JavaRDD> virtualStreamRdd = ?
>
>     Queue<JavaRDD<Row>> queue = Queues.newLinkedBlockingQueue();
>     queue.addAll(virtualStreamRdd);
>
>     /*
>      * Create DStream from the queue
>      */
>
>     3) final JavaDStream<Row> rowDStream =
> streamingContext.queueStream(queue);
>
>
> Currently I'm stucking in 2), any suggestion is appreciated.
>
> Thanks
>
> --
> --Anfernee
>