You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Abhishek Anand <ab...@gmail.com> on 2016/02/16 08:15:55 UTC

Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

I have a kafka rdd and I need to save the offsets to cassandra table at the
begining of each batch.

Basically I need to write the offsets of the type Offsets below that I am
getting inside foreachRD, to cassandra. The javafunctions api to write to
cassandra needs a rdd. How can I create a rdd from offsets and write to
cassandra table.


public static void writeOffsets(JavaPairDStream<String,
String> kafkastream){
kafkastream.foreachRDD((rdd,batchMilliSec) -> {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
return null;
});


Thanks !!
Abhi

Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

Posted by Todd Nist <ts...@gmail.com>.
You could use the "withSessionDo" of the SparkCassandrConnector to preform
the simple insert:

CassandraConnector(conf).withSessionDo { session => session.execute(....) }

-Todd

On Tue, Feb 16, 2016 at 11:01 AM, Cody Koeninger <co...@koeninger.org> wrote:

> You could use sc.parallelize... but the offsets are already available at
> the driver, and they're a (hopefully) small enough amount of data that's
> it's probably more straightforward to just use the normal cassandra client
> to save them from the driver.
>
> On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand <ab...@gmail.com>
> wrote:
>
>> I have a kafka rdd and I need to save the offsets to cassandra table at
>> the begining of each batch.
>>
>> Basically I need to write the offsets of the type Offsets below that I am
>> getting inside foreachRD, to cassandra. The javafunctions api to write to
>> cassandra needs a rdd. How can I create a rdd from offsets and write to
>> cassandra table.
>>
>>
>> public static void writeOffsets(JavaPairDStream<String,
>> String> kafkastream){
>> kafkastream.foreachRDD((rdd,batchMilliSec) -> {
>> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
>> return null;
>> });
>>
>>
>> Thanks !!
>> Abhi
>>
>>
>>
>

Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

Posted by Cody Koeninger <co...@koeninger.org>.
Todd's withSessionDo suggestion seems like a better idea.

On Wed, Feb 17, 2016 at 12:25 AM, Abhishek Anand <ab...@gmail.com>
wrote:

> Hi Cody,
>
> I am able to do using this piece of code
>
> kafkaStreamRdd.foreachRDD((rdd,batchMilliSec) -> {
> Date currentBatchTime = new Date();
> currentBatchTime.setTime(batchMilliSec.milliseconds());
> List<OffsetsClass> r = new ArrayList();
> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
> for(int partition = 0; partition < offsetRanges.length; partition++){
> //Add offsets to the list
> }
> JavaSparkContext ctx = new JavaSparkContext(rdd.context());
> JavaRDD<OffsetsClass> currrentBatchOffsets = ctx.parallelize(r);
> //write currrentBatchOffsets rdd to cassandra
> return null;
> });
>
>
> Is this the correct way of doing this ?
>
>
> Thanks !!
> Abhi
>
> On Tue, Feb 16, 2016 at 9:31 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> You could use sc.parallelize... but the offsets are already available at
>> the driver, and they're a (hopefully) small enough amount of data that's
>> it's probably more straightforward to just use the normal cassandra client
>> to save them from the driver.
>>
>> On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand <ab...@gmail.com>
>> wrote:
>>
>>> I have a kafka rdd and I need to save the offsets to cassandra table at
>>> the begining of each batch.
>>>
>>> Basically I need to write the offsets of the type Offsets below that I
>>> am getting inside foreachRD, to cassandra. The javafunctions api to write
>>> to cassandra needs a rdd. How can I create a rdd from offsets and write to
>>> cassandra table.
>>>
>>>
>>> public static void writeOffsets(JavaPairDStream<String,
>>> String> kafkastream){
>>> kafkastream.foreachRDD((rdd,batchMilliSec) -> {
>>> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
>>> return null;
>>> });
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>>
>>>
>>
>

Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

Posted by Abhishek Anand <ab...@gmail.com>.
Hi Cody,

I am able to do using this piece of code

kafkaStreamRdd.foreachRDD((rdd,batchMilliSec) -> {
Date currentBatchTime = new Date();
currentBatchTime.setTime(batchMilliSec.milliseconds());
List<OffsetsClass> r = new ArrayList();
OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
for(int partition = 0; partition < offsetRanges.length; partition++){
//Add offsets to the list
}
JavaSparkContext ctx = new JavaSparkContext(rdd.context());
JavaRDD<OffsetsClass> currrentBatchOffsets = ctx.parallelize(r);
//write currrentBatchOffsets rdd to cassandra
return null;
});


Is this the correct way of doing this ?


Thanks !!
Abhi

On Tue, Feb 16, 2016 at 9:31 PM, Cody Koeninger <co...@koeninger.org> wrote:

> You could use sc.parallelize... but the offsets are already available at
> the driver, and they're a (hopefully) small enough amount of data that's
> it's probably more straightforward to just use the normal cassandra client
> to save them from the driver.
>
> On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand <ab...@gmail.com>
> wrote:
>
>> I have a kafka rdd and I need to save the offsets to cassandra table at
>> the begining of each batch.
>>
>> Basically I need to write the offsets of the type Offsets below that I am
>> getting inside foreachRD, to cassandra. The javafunctions api to write to
>> cassandra needs a rdd. How can I create a rdd from offsets and write to
>> cassandra table.
>>
>>
>> public static void writeOffsets(JavaPairDStream<String,
>> String> kafkastream){
>> kafkastream.foreachRDD((rdd,batchMilliSec) -> {
>> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
>> return null;
>> });
>>
>>
>> Thanks !!
>> Abhi
>>
>>
>>
>

Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

Posted by Cody Koeninger <co...@koeninger.org>.
You could use sc.parallelize... but the offsets are already available at
the driver, and they're a (hopefully) small enough amount of data that's
it's probably more straightforward to just use the normal cassandra client
to save them from the driver.

On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand <ab...@gmail.com>
wrote:

> I have a kafka rdd and I need to save the offsets to cassandra table at
> the begining of each batch.
>
> Basically I need to write the offsets of the type Offsets below that I am
> getting inside foreachRD, to cassandra. The javafunctions api to write to
> cassandra needs a rdd. How can I create a rdd from offsets and write to
> cassandra table.
>
>
> public static void writeOffsets(JavaPairDStream<String,
> String> kafkastream){
> kafkastream.foreachRDD((rdd,batchMilliSec) -> {
> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
> return null;
> });
>
>
> Thanks !!
> Abhi
>
>
>