You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anfernee Xu <an...@gmail.com> on 2015/10/26 17:16:34 UTC
Spark Streaming: how to use StreamingContext.queueStream with
existing RDD
Hi,
Here's my situation, I have some kind of offline dataset and got them
loaded them into Spark as RDD, but I want to form a virtual data stream
feeding to Spark Streaming, my code looks like this
// sort offline data by time, the dataset spans 2 hours
1) JavaRDD sortedByTime = offlineDataRDD.sortBy( );
// compute a list of JavaRDD, each element JavaRDD is hosting the data
in the same time
// bucket, for example 5 minutes
2) List<JavaRDD> virtualStreamRdd = ?
Queue<JavaRDD<Row>> queue = Queues.newLinkedBlockingQueue();
queue.addAll(virtualStreamRdd);
/*
* Create DStream from the queue
*/
3) final JavaDStream<Row> rowDStream =
streamingContext.queueStream(queue);
Currently I'm stucking in 2), any suggestion is appreciated.
Thanks
--
--Anfernee
Re: Spark Streaming: how to use StreamingContext.queueStream with
existing RDD
Posted by Dean Wampler <de...@gmail.com>.
Check out StreamingContext.queueStream (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext
)
dean
Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com
On Mon, Oct 26, 2015 at 11:16 AM, Anfernee Xu <an...@gmail.com> wrote:
> Hi,
>
> Here's my situation, I have some kind of offline dataset and got them
> loaded them into Spark as RDD, but I want to form a virtual data stream
> feeding to Spark Streaming, my code looks like this
>
>
> // sort offline data by time, the dataset spans 2 hours
> 1) JavaRDD sortedByTime = offlineDataRDD.sortBy( );
>
> // compute a list of JavaRDD, each element JavaRDD is hosting the data
> in the same time
> // bucket, for example 5 minutes
> 2) List<JavaRDD> virtualStreamRdd = ?
>
> Queue<JavaRDD<Row>> queue = Queues.newLinkedBlockingQueue();
> queue.addAll(virtualStreamRdd);
>
> /*
> * Create DStream from the queue
> */
>
> 3) final JavaDStream<Row> rowDStream =
> streamingContext.queueStream(queue);
>
>
> Currently I'm stucking in 2), any suggestion is appreciated.
>
> Thanks
>
> --
> --Anfernee
>