You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jean-Pascal Billaud <jp...@tellapart.com> on 2014/12/14 19:50:45 UTC

DStream demultiplexer based on a key

Hey,

I am doing an experiment with Spark Streaming consisting of moving data
from Kafka to S3 locations while partitioning by date. I have already
looked into Linked Camus and Pinterest Secor and while both are workable
solutions, it just feels that Spark Streaming should be able to be on par
with those without having to manage yet another application in our stack
since we already have a Spark Streaming cluster in production.

So what I am trying to do is very simple really. Each message in Kafka is
thrift serialized, and the corresponding thrift objects have a timestamp
field. What I'd like is to do is something like that:

JavaPairDStream stream = KafkaUtils.createRawStream(...)
stream = stream.map(new PairFunction<Tuple2<Void, Log>, String, Log> {
  public Tuple2<String, Log> call(Tuple2<Void, Log> tuple) {
    return new Tuple2<>(tuple._2().getDate(), tuple._2());
  }
}

At this point, I'd like to do some partitioning on the resulting DStream to
have multiple DStream each with a single common string Date... So for
instance in one DStream I would have all the entries from 12/01 and on
another the entries from 12/02. Once I have this list of DStream, for each
of them I would call saveAsObjectFiles() basically. I unfortunately did not
find a way to demultiplex DStream based on a key. Obviously the reduce
operation families does some of that but the result is still a single
DStream.

An alternative approach would be to call forEachRDD() on the DStream and
demultiplex the entries into multiple new RDDs based on the timestamp to
bucketize the entries with the same day date in the same RDD and finally
call saveAsObjectFiles(). I am not sure if I can use parallelize() to
create those RDDs?

Another thing that I am gonna be experimenting with is to use much longer
batching interval. I am talking in minutes because I don't want to have
bunch of tiny files. I might simply use a bigger Duration or use one of the
window operation. Not sure if anybody tries running Spark Streaming in that
way.

Any thoughts on that would be much appreciated,

Thanks!

Re: DStream demultiplexer based on a key

Posted by Gerard Maas <ge...@gmail.com>.
I haven't done anything else than performance tuning on Spark Streaming for
the past weeks. rdd.cache makes a huge difference. A must in this case
where you want to iterate over the same RDD several times.

Intuitively, I also thought that all data was in memory already so that
wouldn't make a difference and I was very surprised to see stage times
dropping from seconds to ms when cache() was present.

Our intervals are 10-12 seconds long. I've not tried batches of minutes
yet.
Probably the best way would be to use window functions for that.  Although
something in the 1-5 minute range should be doable as well.

-kr, Gerard.




On Sun, Dec 14, 2014 at 8:25 PM, Jean-Pascal Billaud <jp...@tellapart.com>
wrote:
>
> Ah! That sounds very much like what I need. A very basic question (most
> likely), why is "rdd.cache()" critical? Isn't it already true that in Spark
> Streaming DStream are cached in memory anyway?
>
> Also any experience with minutes long batch interval?
>
> Thanks for the quick answer!
>
> On Sun, Dec 14, 2014 at 11:17 AM, Gerard Maas <ge...@gmail.com>
> wrote:
>>
>> Hi Jean-Pascal,
>>
>> At Virdata we do a similar thing to 'bucketize' our data to different
>> keyspaces in Cassandra.
>>
>> The basic construction would be to filter the DStream (or the underlying
>> RDD) for each key and then apply the usual storage operations on that new
>> data set.
>> Given that, in your case, you need the data within the stream to apply
>> the filter, you will need first to collect those keys in order to create
>> the buckets.
>>
>> Something like this:
>>
>> val kafkaStream =  ???
>> kafkaStream.foreachRDD{rdd  =>
>>     rdd.cache() // very important!
>>     val keys = rdd.map(elem => key(elem)).distinct.collect  // where
>> key(...) is a function to get the desired key from each record
>>     keys.foreach{ key =>
>>         rdd.filter(elem=> key(elem) == key).saveAsObjectFile(...)
>>     }
>>     rdd.unpersist()
>> }
>>
>> -kr, Gerard.
>>
>>
>>
>>
>> On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud <jp...@tellapart.com>
>> wrote:
>>>
>>> Hey,
>>>
>>> I am doing an experiment with Spark Streaming consisting of moving data
>>> from Kafka to S3 locations while partitioning by date. I have already
>>> looked into Linked Camus and Pinterest Secor and while both are workable
>>> solutions, it just feels that Spark Streaming should be able to be on par
>>> with those without having to manage yet another application in our stack
>>> since we already have a Spark Streaming cluster in production.
>>>
>>> So what I am trying to do is very simple really. Each message in Kafka
>>> is thrift serialized, and the corresponding thrift objects have a timestamp
>>> field. What I'd like is to do is something like that:
>>>
>>> JavaPairDStream stream = KafkaUtils.createRawStream(...)
>>> stream = stream.map(new PairFunction<Tuple2<Void, Log>, String, Log> {
>>>   public Tuple2<String, Log> call(Tuple2<Void, Log> tuple) {
>>>     return new Tuple2<>(tuple._2().getDate(), tuple._2());
>>>   }
>>> }
>>>
>>> At this point, I'd like to do some partitioning on the resulting DStream
>>> to have multiple DStream each with a single common string Date... So for
>>> instance in one DStream I would have all the entries from 12/01 and on
>>> another the entries from 12/02. Once I have this list of DStream, for each
>>> of them I would call saveAsObjectFiles() basically. I unfortunately did not
>>> find a way to demultiplex DStream based on a key. Obviously the reduce
>>> operation families does some of that but the result is still a single
>>> DStream.
>>>
>>> An alternative approach would be to call forEachRDD() on the DStream and
>>> demultiplex the entries into multiple new RDDs based on the timestamp to
>>> bucketize the entries with the same day date in the same RDD and finally
>>> call saveAsObjectFiles(). I am not sure if I can use parallelize() to
>>> create those RDDs?
>>>
>>> Another thing that I am gonna be experimenting with is to use much
>>> longer batching interval. I am talking in minutes because I don't want to
>>> have bunch of tiny files. I might simply use a bigger Duration or use one
>>> of the window operation. Not sure if anybody tries running Spark Streaming
>>> in that way.
>>>
>>> Any thoughts on that would be much appreciated,
>>>
>>> Thanks!
>>>
>>

Re: DStream demultiplexer based on a key

Posted by Jean-Pascal Billaud <jp...@tellapart.com>.
Ah! That sounds very much like what I need. A very basic question (most
likely), why is "rdd.cache()" critical? Isn't it already true that in Spark
Streaming DStream are cached in memory anyway?

Also any experience with minutes long batch interval?

Thanks for the quick answer!

On Sun, Dec 14, 2014 at 11:17 AM, Gerard Maas <ge...@gmail.com> wrote:
>
> Hi Jean-Pascal,
>
> At Virdata we do a similar thing to 'bucketize' our data to different
> keyspaces in Cassandra.
>
> The basic construction would be to filter the DStream (or the underlying
> RDD) for each key and then apply the usual storage operations on that new
> data set.
> Given that, in your case, you need the data within the stream to apply the
> filter, you will need first to collect those keys in order to create the
> buckets.
>
> Something like this:
>
> val kafkaStream =  ???
> kafkaStream.foreachRDD{rdd  =>
>     rdd.cache() // very important!
>     val keys = rdd.map(elem => key(elem)).distinct.collect  // where
> key(...) is a function to get the desired key from each record
>     keys.foreach{ key =>
>         rdd.filter(elem=> key(elem) == key).saveAsObjectFile(...)
>     }
>     rdd.unpersist()
> }
>
> -kr, Gerard.
>
>
>
>
> On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud <jp...@tellapart.com>
> wrote:
>>
>> Hey,
>>
>> I am doing an experiment with Spark Streaming consisting of moving data
>> from Kafka to S3 locations while partitioning by date. I have already
>> looked into Linked Camus and Pinterest Secor and while both are workable
>> solutions, it just feels that Spark Streaming should be able to be on par
>> with those without having to manage yet another application in our stack
>> since we already have a Spark Streaming cluster in production.
>>
>> So what I am trying to do is very simple really. Each message in Kafka is
>> thrift serialized, and the corresponding thrift objects have a timestamp
>> field. What I'd like is to do is something like that:
>>
>> JavaPairDStream stream = KafkaUtils.createRawStream(...)
>> stream = stream.map(new PairFunction<Tuple2<Void, Log>, String, Log> {
>>   public Tuple2<String, Log> call(Tuple2<Void, Log> tuple) {
>>     return new Tuple2<>(tuple._2().getDate(), tuple._2());
>>   }
>> }
>>
>> At this point, I'd like to do some partitioning on the resulting DStream
>> to have multiple DStream each with a single common string Date... So for
>> instance in one DStream I would have all the entries from 12/01 and on
>> another the entries from 12/02. Once I have this list of DStream, for each
>> of them I would call saveAsObjectFiles() basically. I unfortunately did not
>> find a way to demultiplex DStream based on a key. Obviously the reduce
>> operation families does some of that but the result is still a single
>> DStream.
>>
>> An alternative approach would be to call forEachRDD() on the DStream and
>> demultiplex the entries into multiple new RDDs based on the timestamp to
>> bucketize the entries with the same day date in the same RDD and finally
>> call saveAsObjectFiles(). I am not sure if I can use parallelize() to
>> create those RDDs?
>>
>> Another thing that I am gonna be experimenting with is to use much longer
>> batching interval. I am talking in minutes because I don't want to have
>> bunch of tiny files. I might simply use a bigger Duration or use one of the
>> window operation. Not sure if anybody tries running Spark Streaming in that
>> way.
>>
>> Any thoughts on that would be much appreciated,
>>
>> Thanks!
>>
>

Re: DStream demultiplexer based on a key

Posted by Gerard Maas <ge...@gmail.com>.
Hi Jean-Pascal,

At Virdata we do a similar thing to 'bucketize' our data to different
keyspaces in Cassandra.

The basic construction would be to filter the DStream (or the underlying
RDD) for each key and then apply the usual storage operations on that new
data set.
Given that, in your case, you need the data within the stream to apply the
filter, you will need first to collect those keys in order to create the
buckets.

Something like this:

val kafkaStream =  ???
kafkaStream.foreachRDD{rdd  =>
    rdd.cache() // very important!
    val keys = rdd.map(elem => key(elem)).distinct.collect  // where
key(...) is a function to get the desired key from each record
    keys.foreach{ key =>
        rdd.filter(elem=> key(elem) == key).saveAsObjectFile(...)
    }
    rdd.unpersist()
}

-kr, Gerard.




On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud <jp...@tellapart.com>
wrote:
>
> Hey,
>
> I am doing an experiment with Spark Streaming consisting of moving data
> from Kafka to S3 locations while partitioning by date. I have already
> looked into Linked Camus and Pinterest Secor and while both are workable
> solutions, it just feels that Spark Streaming should be able to be on par
> with those without having to manage yet another application in our stack
> since we already have a Spark Streaming cluster in production.
>
> So what I am trying to do is very simple really. Each message in Kafka is
> thrift serialized, and the corresponding thrift objects have a timestamp
> field. What I'd like is to do is something like that:
>
> JavaPairDStream stream = KafkaUtils.createRawStream(...)
> stream = stream.map(new PairFunction<Tuple2<Void, Log>, String, Log> {
>   public Tuple2<String, Log> call(Tuple2<Void, Log> tuple) {
>     return new Tuple2<>(tuple._2().getDate(), tuple._2());
>   }
> }
>
> At this point, I'd like to do some partitioning on the resulting DStream
> to have multiple DStream each with a single common string Date... So for
> instance in one DStream I would have all the entries from 12/01 and on
> another the entries from 12/02. Once I have this list of DStream, for each
> of them I would call saveAsObjectFiles() basically. I unfortunately did not
> find a way to demultiplex DStream based on a key. Obviously the reduce
> operation families does some of that but the result is still a single
> DStream.
>
> An alternative approach would be to call forEachRDD() on the DStream and
> demultiplex the entries into multiple new RDDs based on the timestamp to
> bucketize the entries with the same day date in the same RDD and finally
> call saveAsObjectFiles(). I am not sure if I can use parallelize() to
> create those RDDs?
>
> Another thing that I am gonna be experimenting with is to use much longer
> batching interval. I am talking in minutes because I don't want to have
> bunch of tiny files. I might simply use a bigger Duration or use one of the
> window operation. Not sure if anybody tries running Spark Streaming in that
> way.
>
> Any thoughts on that would be much appreciated,
>
> Thanks!
>