You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Deepesh Maheshwari <de...@gmail.com> on 2015/08/26 14:55:02 UTC

Custom Offset Management

Hi Folks,

My Spark application interacts with kafka for getting data through Java Api.
I am using Direct Approach (No Receivers) - which use Kafka’s simple
consumer API to Read data.
So, kafka offsets need to be handles explicitly.

In case of Spark failure i need to save the offset state of kafka for
resuming from the failure point.
I am saving these points in MongoDB.

Please tell he how to initialize Kafka DirectStream with saved offset
points.
I want to initialize kafka stream in Spark Streaming with required offset
points.

There is method i gets on web.

        KafkaUtils.createDirectStream(jssc, String.class, String.class,
                StringDecoder.class, StringDecoder.class, kafkaParams,
                topicsSet, fromOffsets, arg8);

arg8 - kafka.message.MessageAndMetadata

Please tell me how to handle and initialize this.

Regards,
Deepesh

Re: Custom Offset Management

Posted by Cody Koeninger <co...@koeninger.org>.
That argument takes a function from MessageAndMetadata to whatever you want
your stream to contain.

See

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala#L57

On Wed, Aug 26, 2015 at 7:55 AM, Deepesh Maheshwari <
deepesh.maheshwari17@gmail.com> wrote:

> Hi Folks,
>
> My Spark application interacts with kafka for getting data through Java
> Api.
> I am using Direct Approach (No Receivers) - which use Kafka’s simple
> consumer API to Read data.
> So, kafka offsets need to be handles explicitly.
>
> In case of Spark failure i need to save the offset state of kafka for
> resuming from the failure point.
> I am saving these points in MongoDB.
>
> Please tell he how to initialize Kafka DirectStream with saved offset
> points.
> I want to initialize kafka stream in Spark Streaming with required offset
> points.
>
> There is method i gets on web.
>
>         KafkaUtils.createDirectStream(jssc, String.class, String.class,
>                 StringDecoder.class, StringDecoder.class, kafkaParams,
>                 topicsSet, fromOffsets, arg8);
>
> arg8 - kafka.message.MessageAndMetadata
>
> Please tell me how to handle and initialize this.
>
> Regards,
> Deepesh
>