You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by "Raja.Aravapalli" <Ra...@target.com> on 2016/06/06 22:06:38 UTC

kafka offset commit

Hi

Can someone please help me understand, where will the offsets be stored when consuming with “KafkaSinglePortStringInputOperator”  ?

And, how to handle restarts ?


I worked with Storm earlier, Storm maintains the offsets in zookeeper and client id is maintained for every consumer, using which

- we can see what is the current offset status for a given partition & modify them as well using zookeeper-cli !!
- restarts can be handled


As per the Apex documentation, I can see, that using OffsetManager we can handle the restarts effectively, but couldn’t find any examples to refer…

How clientId can be used to retrieve offsets status
And ability to edit the offsets etc

can someone pls help me find this ?


Thanks a lot!!


-Regards,
Raja.




Re: kafka offset commit

Posted by Devendra Tagare <de...@datatorrent.com>.
Hi,

I had started work on an offset manager for kafka 0.8x sometime back which
got left mid-way.This implementation was using kafka topics to store
offsets (similar to 0.9 implementation)

https://github.com/apache/apex-malhar/pull/156

If the community is using it, I can incorporate the comments and make it
Malhar ready.

Thanks,
Dev

On Mon, Jun 6, 2016 at 9:55 PM, hsy541@gmail.com <hs...@gmail.com> wrote:

> Hi Raja,
>
> Yes, I think if you implement the interface and set it as input operator
> property It should serve the purpose.
>
> I don't think it would be a bottle neck since It is just a list data
> structure of numbers and it only update every checkpoint interval.
>
> Regards,
> Siyuan
>
> On Mon, Jun 6, 2016 at 5:43 PM, Raja.Aravapalli <
> Raja.Aravapalli@target.com> wrote:
>
>>
>>
>> Thanks a lot Siyuan. It helped me understand better!!
>>
>>
>> So, can you pls confirm, if I implement the offsetManager interface, it
>> will be used to load initial starting position and update the offset
>> status[at some interval] ?
>>
>> Will the application latency greatly decreases if I use HDFS for storage ?
>>
>> Thank you very much.
>>
>> Regards,
>> Raja.
>>
>> From: "hsy541@gmail.com" <hs...@gmail.com>
>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>> Date: Monday, June 6, 2016 at 7:13 PM
>>
>> To: "users@apex.apache.org" <us...@apex.apache.org>
>> Subject: Re: kafka offset commit
>>
>> Raja,
>>
>> Not exactly, Apex actually stores offsets as part of the operator state,
>> And state of the operator are checkpointed internally and periodically( in
>> HDFS by default). For more details, you can read this
>> https://www.datatorrent.com/blog/blog-introduction-to-checkpoint/
>>
>> With that said, offsets are stored in HDFS along with other state of the
>> operator so that it can recover in case of any system failure.
>> And also in Apex, you can do stateful restart (start the application by
>> specifying the previous application id). It will initialize all operators
>> and load the checkpointed state (offsets will be part of it) from HDFS and
>> continue run from that state.  The only limit is, you can not easy tell
>> where the current offsets are.  Hope this answered your question.
>>
>> Regards,
>> Siyuan
>>
>>
>> On Mon, Jun 6, 2016 at 4:57 PM, Raja.Aravapalli <
>> Raja.Aravapalli@target.com> wrote:
>>
>>>
>>> Thanks Siyuan.
>>>
>>> So, to confirm, to apex is not storing offsets status at any location ?
>>> Like how Storm stores in Zookeeper ?
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: "hsy541@gmail.com" <hs...@gmail.com>
>>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>>> Date: Monday, June 6, 2016 at 6:42 PM
>>>
>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>> Subject: Re: kafka offset commit
>>>
>>> Hey Raja,
>>>
>>> For 0.8, you have to implement OffsetManager interface on your own. The
>>> updateOffsets will be called in application master every time when it get
>>> updated offsets from each physical partition. And the offsets that you see
>>> in the method is committed offset. So you can safely save these offsets
>>> into either zookeeper(0.8.2 client has API to do that) or any other
>>> datastore like DB or HDFS.  And also you have to implement the method
>>> loadInitialOffsets to load back offset you want.
>>>
>>> You are welcome to contribute a default implementation using buildin
>>> kafka offset commit request API for OffsetManager!
>>>
>>> Regards,
>>> Siyuan
>>>
>>> On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli <
>>> Raja.Aravapalli@target.com> wrote:
>>>
>>>>
>>>> Hi Thomas,
>>>>
>>>> We are using 0.8 cluster still!!
>>>>
>>>>
>>>> Regards,
>>>> Raja.
>>>>
>>>> From: Thomas Weise <th...@gmail.com>
>>>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>>>> Date: Monday, June 6, 2016 at 5:23 PM
>>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>>> Subject: Re: kafka offset commit
>>>>
>>>> Hi Raja,
>>>>
>>>> Which Kafka version are you using?
>>>>
>>>> With the new 0.9 connector there is no need for the offset manager:
>>>>
>>>>
>>>> https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>> On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <
>>>> Raja.Aravapalli@target.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> Can someone please help me understand, where will the offsets be
>>>>> stored when consuming with “*KafkaSinglePortStringInputOperator*”  ?
>>>>>
>>>>> And, how to handle restarts ?
>>>>>
>>>>>
>>>>> I worked with Storm earlier, Storm maintains the offsets in zookeeper
>>>>> and client id is maintained for every consumer, using which
>>>>>
>>>>> - we can see what is the current offset status for a given partition &
>>>>> modify them as well using zookeeper-cli !!
>>>>> - restarts can be handled
>>>>>
>>>>>
>>>>> As per the Apex documentation, I can see, that using OffsetManager we
>>>>> can handle the restarts effectively, but couldn’t find any examples to
>>>>> refer…
>>>>>
>>>>> How clientId can be used to retrieve offsets status
>>>>> And ability to edit the offsets etc
>>>>>
>>>>> can someone pls help me find this ?
>>>>>
>>>>>
>>>>> Thanks a lot!!
>>>>>
>>>>>
>>>>> -Regards,
>>>>> Raja.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: kafka offset commit

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Hi Raja,

Yes, I think if you implement the interface and set it as input operator
property It should serve the purpose.

I don't think it would be a bottle neck since It is just a list data
structure of numbers and it only update every checkpoint interval.

Regards,
Siyuan

On Mon, Jun 6, 2016 at 5:43 PM, Raja.Aravapalli <Ra...@target.com>
wrote:

>
>
> Thanks a lot Siyuan. It helped me understand better!!
>
>
> So, can you pls confirm, if I implement the offsetManager interface, it
> will be used to load initial starting position and update the offset
> status[at some interval] ?
>
> Will the application latency greatly decreases if I use HDFS for storage ?
>
> Thank you very much.
>
> Regards,
> Raja.
>
> From: "hsy541@gmail.com" <hs...@gmail.com>
> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
> Date: Monday, June 6, 2016 at 7:13 PM
>
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: kafka offset commit
>
> Raja,
>
> Not exactly, Apex actually stores offsets as part of the operator state,
> And state of the operator are checkpointed internally and periodically( in
> HDFS by default). For more details, you can read this
> https://www.datatorrent.com/blog/blog-introduction-to-checkpoint/
>
> With that said, offsets are stored in HDFS along with other state of the
> operator so that it can recover in case of any system failure.
> And also in Apex, you can do stateful restart (start the application by
> specifying the previous application id). It will initialize all operators
> and load the checkpointed state (offsets will be part of it) from HDFS and
> continue run from that state.  The only limit is, you can not easy tell
> where the current offsets are.  Hope this answered your question.
>
> Regards,
> Siyuan
>
>
> On Mon, Jun 6, 2016 at 4:57 PM, Raja.Aravapalli <
> Raja.Aravapalli@target.com> wrote:
>
>>
>> Thanks Siyuan.
>>
>> So, to confirm, to apex is not storing offsets status at any location ?
>> Like how Storm stores in Zookeeper ?
>>
>>
>> Regards,
>> Raja.
>>
>> From: "hsy541@gmail.com" <hs...@gmail.com>
>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>> Date: Monday, June 6, 2016 at 6:42 PM
>>
>> To: "users@apex.apache.org" <us...@apex.apache.org>
>> Subject: Re: kafka offset commit
>>
>> Hey Raja,
>>
>> For 0.8, you have to implement OffsetManager interface on your own. The
>> updateOffsets will be called in application master every time when it get
>> updated offsets from each physical partition. And the offsets that you see
>> in the method is committed offset. So you can safely save these offsets
>> into either zookeeper(0.8.2 client has API to do that) or any other
>> datastore like DB or HDFS.  And also you have to implement the method
>> loadInitialOffsets to load back offset you want.
>>
>> You are welcome to contribute a default implementation using buildin
>> kafka offset commit request API for OffsetManager!
>>
>> Regards,
>> Siyuan
>>
>> On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli <
>> Raja.Aravapalli@target.com> wrote:
>>
>>>
>>> Hi Thomas,
>>>
>>> We are using 0.8 cluster still!!
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: Thomas Weise <th...@gmail.com>
>>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>>> Date: Monday, June 6, 2016 at 5:23 PM
>>> To: "users@apex.apache.org" <us...@apex.apache.org>
>>> Subject: Re: kafka offset commit
>>>
>>> Hi Raja,
>>>
>>> Which Kafka version are you using?
>>>
>>> With the new 0.9 connector there is no need for the offset manager:
>>>
>>>
>>> https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <
>>> Raja.Aravapalli@target.com> wrote:
>>>
>>>> Hi
>>>>
>>>> Can someone please help me understand, where will the offsets be stored
>>>> when consuming with “*KafkaSinglePortStringInputOperator*”  ?
>>>>
>>>> And, how to handle restarts ?
>>>>
>>>>
>>>> I worked with Storm earlier, Storm maintains the offsets in zookeeper
>>>> and client id is maintained for every consumer, using which
>>>>
>>>> - we can see what is the current offset status for a given partition &
>>>> modify them as well using zookeeper-cli !!
>>>> - restarts can be handled
>>>>
>>>>
>>>> As per the Apex documentation, I can see, that using OffsetManager we
>>>> can handle the restarts effectively, but couldn’t find any examples to
>>>> refer…
>>>>
>>>> How clientId can be used to retrieve offsets status
>>>> And ability to edit the offsets etc
>>>>
>>>> can someone pls help me find this ?
>>>>
>>>>
>>>> Thanks a lot!!
>>>>
>>>>
>>>> -Regards,
>>>> Raja.
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: kafka offset commit

Posted by "Raja.Aravapalli" <Ra...@target.com>.

Thanks a lot Siyuan. It helped me understand better!!


So, can you pls confirm, if I implement the offsetManager interface, it will be used to load initial starting position and update the offset status[at some interval] ?

Will the application latency greatly decreases if I use HDFS for storage ?

Thank you very much.

Regards,
Raja.

From: "hsy541@gmail.com<ma...@gmail.com>" <hs...@gmail.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 6, 2016 at 7:13 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: kafka offset commit

Raja,

Not exactly, Apex actually stores offsets as part of the operator state, And state of the operator are checkpointed internally and periodically( in HDFS by default). For more details, you can read this https://www.datatorrent.com/blog/blog-introduction-to-checkpoint/

With that said, offsets are stored in HDFS along with other state of the operator so that it can recover in case of any system failure.
And also in Apex, you can do stateful restart (start the application by specifying the previous application id). It will initialize all operators and load the checkpointed state (offsets will be part of it) from HDFS and continue run from that state.  The only limit is, you can not easy tell where the current offsets are.  Hope this answered your question.

Regards,
Siyuan


On Mon, Jun 6, 2016 at 4:57 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Thanks Siyuan.

So, to confirm, to apex is not storing offsets status at any location ? Like how Storm stores in Zookeeper ?


Regards,
Raja.

From: "hsy541@gmail.com<ma...@gmail.com>" <hs...@gmail.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 6, 2016 at 6:42 PM

To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: kafka offset commit

Hey Raja,

For 0.8, you have to implement OffsetManager interface on your own. The updateOffsets will be called in application master every time when it get updated offsets from each physical partition. And the offsets that you see in the method is committed offset. So you can safely save these offsets into either zookeeper(0.8.2 client has API to do that) or any other datastore like DB or HDFS.  And also you have to implement the method loadInitialOffsets to load back offset you want.

You are welcome to contribute a default implementation using buildin kafka offset commit request API for OffsetManager!

Regards,
Siyuan

On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi Thomas,

We are using 0.8 cluster still!!


Regards,
Raja.

From: Thomas Weise <th...@gmail.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 6, 2016 at 5:23 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: kafka offset commit

Hi Raja,

Which Kafka version are you using?

With the new 0.9 connector there is no need for the offset manager:

https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka

Thanks,
Thomas


On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <Ra...@target.com>> wrote:
Hi

Can someone please help me understand, where will the offsets be stored when consuming with “KafkaSinglePortStringInputOperator”  ?

And, how to handle restarts ?


I worked with Storm earlier, Storm maintains the offsets in zookeeper and client id is maintained for every consumer, using which

- we can see what is the current offset status for a given partition & modify them as well using zookeeper-cli !!
- restarts can be handled


As per the Apex documentation, I can see, that using OffsetManager we can handle the restarts effectively, but couldn’t find any examples to refer…

How clientId can be used to retrieve offsets status
And ability to edit the offsets etc

can someone pls help me find this ?


Thanks a lot!!


-Regards,
Raja.







Re: kafka offset commit

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Raja,

Not exactly, Apex actually stores offsets as part of the operator state,
And state of the operator are checkpointed internally and periodically( in
HDFS by default). For more details, you can read this
https://www.datatorrent.com/blog/blog-introduction-to-checkpoint/

With that said, offsets are stored in HDFS along with other state of the
operator so that it can recover in case of any system failure.
And also in Apex, you can do stateful restart (start the application by
specifying the previous application id). It will initialize all operators
and load the checkpointed state (offsets will be part of it) from HDFS and
continue run from that state.  The only limit is, you can not easy tell
where the current offsets are.  Hope this answered your question.

Regards,
Siyuan


On Mon, Jun 6, 2016 at 4:57 PM, Raja.Aravapalli <Ra...@target.com>
wrote:

>
> Thanks Siyuan.
>
> So, to confirm, to apex is not storing offsets status at any location ?
> Like how Storm stores in Zookeeper ?
>
>
> Regards,
> Raja.
>
> From: "hsy541@gmail.com" <hs...@gmail.com>
> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
> Date: Monday, June 6, 2016 at 6:42 PM
>
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: kafka offset commit
>
> Hey Raja,
>
> For 0.8, you have to implement OffsetManager interface on your own. The
> updateOffsets will be called in application master every time when it get
> updated offsets from each physical partition. And the offsets that you see
> in the method is committed offset. So you can safely save these offsets
> into either zookeeper(0.8.2 client has API to do that) or any other
> datastore like DB or HDFS.  And also you have to implement the method
> loadInitialOffsets to load back offset you want.
>
> You are welcome to contribute a default implementation using buildin kafka
> offset commit request API for OffsetManager!
>
> Regards,
> Siyuan
>
> On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli <
> Raja.Aravapalli@target.com> wrote:
>
>>
>> Hi Thomas,
>>
>> We are using 0.8 cluster still!!
>>
>>
>> Regards,
>> Raja.
>>
>> From: Thomas Weise <th...@gmail.com>
>> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
>> Date: Monday, June 6, 2016 at 5:23 PM
>> To: "users@apex.apache.org" <us...@apex.apache.org>
>> Subject: Re: kafka offset commit
>>
>> Hi Raja,
>>
>> Which Kafka version are you using?
>>
>> With the new 0.9 connector there is no need for the offset manager:
>>
>>
>> https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka
>>
>> Thanks,
>> Thomas
>>
>>
>> On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <
>> Raja.Aravapalli@target.com> wrote:
>>
>>> Hi
>>>
>>> Can someone please help me understand, where will the offsets be stored
>>> when consuming with “*KafkaSinglePortStringInputOperator*”  ?
>>>
>>> And, how to handle restarts ?
>>>
>>>
>>> I worked with Storm earlier, Storm maintains the offsets in zookeeper
>>> and client id is maintained for every consumer, using which
>>>
>>> - we can see what is the current offset status for a given partition &
>>> modify them as well using zookeeper-cli !!
>>> - restarts can be handled
>>>
>>>
>>> As per the Apex documentation, I can see, that using OffsetManager we
>>> can handle the restarts effectively, but couldn’t find any examples to
>>> refer…
>>>
>>> How clientId can be used to retrieve offsets status
>>> And ability to edit the offsets etc
>>>
>>> can someone pls help me find this ?
>>>
>>>
>>> Thanks a lot!!
>>>
>>>
>>> -Regards,
>>> Raja.
>>>
>>>
>>>
>>>
>>
>

Re: kafka offset commit

Posted by "Raja.Aravapalli" <Ra...@target.com>.
Thanks Siyuan.

So, to confirm, to apex is not storing offsets status at any location ? Like how Storm stores in Zookeeper ?


Regards,
Raja.

From: "hsy541@gmail.com<ma...@gmail.com>" <hs...@gmail.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 6, 2016 at 6:42 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: kafka offset commit

Hey Raja,

For 0.8, you have to implement OffsetManager interface on your own. The updateOffsets will be called in application master every time when it get updated offsets from each physical partition. And the offsets that you see in the method is committed offset. So you can safely save these offsets into either zookeeper(0.8.2 client has API to do that) or any other datastore like DB or HDFS.  And also you have to implement the method loadInitialOffsets to load back offset you want.

You are welcome to contribute a default implementation using buildin kafka offset commit request API for OffsetManager!

Regards,
Siyuan

On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli <Ra...@target.com>> wrote:

Hi Thomas,

We are using 0.8 cluster still!!


Regards,
Raja.

From: Thomas Weise <th...@gmail.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 6, 2016 at 5:23 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: kafka offset commit

Hi Raja,

Which Kafka version are you using?

With the new 0.9 connector there is no need for the offset manager:

https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka

Thanks,
Thomas


On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <Ra...@target.com>> wrote:
Hi

Can someone please help me understand, where will the offsets be stored when consuming with “KafkaSinglePortStringInputOperator”  ?

And, how to handle restarts ?


I worked with Storm earlier, Storm maintains the offsets in zookeeper and client id is maintained for every consumer, using which

- we can see what is the current offset status for a given partition & modify them as well using zookeeper-cli !!
- restarts can be handled


As per the Apex documentation, I can see, that using OffsetManager we can handle the restarts effectively, but couldn’t find any examples to refer…

How clientId can be used to retrieve offsets status
And ability to edit the offsets etc

can someone pls help me find this ?


Thanks a lot!!


-Regards,
Raja.






Re: kafka offset commit

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Hey Raja,

For 0.8, you have to implement OffsetManager interface on your own. The
updateOffsets will be called in application master every time when it get
updated offsets from each physical partition. And the offsets that you see
in the method is committed offset. So you can safely save these offsets
into either zookeeper(0.8.2 client has API to do that) or any other
datastore like DB or HDFS.  And also you have to implement the method
loadInitialOffsets to load back offset you want.

You are welcome to contribute a default implementation using buildin kafka
offset commit request API for OffsetManager!

Regards,
Siyuan

On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli <Ra...@target.com>
wrote:

>
> Hi Thomas,
>
> We are using 0.8 cluster still!!
>
>
> Regards,
> Raja.
>
> From: Thomas Weise <th...@gmail.com>
> Reply-To: "users@apex.apache.org" <us...@apex.apache.org>
> Date: Monday, June 6, 2016 at 5:23 PM
> To: "users@apex.apache.org" <us...@apex.apache.org>
> Subject: Re: kafka offset commit
>
> Hi Raja,
>
> Which Kafka version are you using?
>
> With the new 0.9 connector there is no need for the offset manager:
>
>
> https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka
>
> Thanks,
> Thomas
>
>
> On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <
> Raja.Aravapalli@target.com> wrote:
>
>> Hi
>>
>> Can someone please help me understand, where will the offsets be stored
>> when consuming with “*KafkaSinglePortStringInputOperator*”  ?
>>
>> And, how to handle restarts ?
>>
>>
>> I worked with Storm earlier, Storm maintains the offsets in zookeeper and
>> client id is maintained for every consumer, using which
>>
>> - we can see what is the current offset status for a given partition &
>> modify them as well using zookeeper-cli !!
>> - restarts can be handled
>>
>>
>> As per the Apex documentation, I can see, that using OffsetManager we can
>> handle the restarts effectively, but couldn’t find any examples to refer…
>>
>> How clientId can be used to retrieve offsets status
>> And ability to edit the offsets etc
>>
>> can someone pls help me find this ?
>>
>>
>> Thanks a lot!!
>>
>>
>> -Regards,
>> Raja.
>>
>>
>>
>>
>

Re: kafka offset commit

Posted by "Raja.Aravapalli" <Ra...@target.com>.
Hi Thomas,

We are using 0.8 cluster still!!


Regards,
Raja.

From: Thomas Weise <th...@gmail.com>>
Reply-To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Date: Monday, June 6, 2016 at 5:23 PM
To: "users@apex.apache.org<ma...@apex.apache.org>" <us...@apex.apache.org>>
Subject: Re: kafka offset commit

Hi Raja,

Which Kafka version are you using?

With the new 0.9 connector there is no need for the offset manager:

https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka

Thanks,
Thomas


On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <Ra...@target.com>> wrote:
Hi

Can someone please help me understand, where will the offsets be stored when consuming with “KafkaSinglePortStringInputOperator”  ?

And, how to handle restarts ?


I worked with Storm earlier, Storm maintains the offsets in zookeeper and client id is maintained for every consumer, using which

- we can see what is the current offset status for a given partition & modify them as well using zookeeper-cli !!
- restarts can be handled


As per the Apex documentation, I can see, that using OffsetManager we can handle the restarts effectively, but couldn’t find any examples to refer…

How clientId can be used to retrieve offsets status
And ability to edit the offsets etc

can someone pls help me find this ?


Thanks a lot!!


-Regards,
Raja.





Re: kafka offset commit

Posted by Thomas Weise <th...@gmail.com>.
Hi Raja,

Which Kafka version are you using?

With the new 0.9 connector there is no need for the offset manager:

https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka

Thanks,
Thomas


On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <Ra...@target.com>
wrote:

> Hi
>
> Can someone please help me understand, where will the offsets be stored
> when consuming with “*KafkaSinglePortStringInputOperator*”  ?
>
> And, how to handle restarts ?
>
>
> I worked with Storm earlier, Storm maintains the offsets in zookeeper and
> client id is maintained for every consumer, using which
>
> - we can see what is the current offset status for a given partition &
> modify them as well using zookeeper-cli !!
> - restarts can be handled
>
>
> As per the Apex documentation, I can see, that using OffsetManager we can
> handle the restarts effectively, but couldn’t find any examples to refer…
>
> How clientId can be used to retrieve offsets status
> And ability to edit the offsets etc
>
> can someone pls help me find this ?
>
>
> Thanks a lot!!
>
>
> -Regards,
> Raja.
>
>
>
>