You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mayuresh Gharat <gh...@gmail.com> on 2015/03/27 17:17:27 UTC

Re: Kafka 8.2.1 Offset fetch Request

Hi Madhukar,

I am going through your code now. Let me see what I can find.

Where were you storing your offsets before?
Was it always Zookeeper or was it Kafka?
If it was Zookeeper, the correct way to migrate from zookeeper to kafka
based offsets is this :

1) Config Change :
     - offsets.storage = kafka
     - dual.commit.enabled = true
2) Rolling Bounce
3) Config Change :
     - dual.commit.enabled=false
4) Rolling Bounce.

For more info on Offset Management, you can also refer these slides from
Kafka Meetup:
http://www.slideshare.net/jjkoshy/offset-management-in-kafka


Apart from that for using Kafka based offsets, to do a fetchOffsetRequest
or commit offset request you don't need a consumer. You need to know the
groupId. You need to connect to kafka, issue a consumerMetaData Request.
This will fetch you the OffsetManager for that groupId. You can then issue
the fetch or commit request to that OffsetManager.

BTW, we are coming up with an offsetClient soon.

Thanks,

Mayuresh

On Fri, Mar 27, 2015 at 1:53 AM, Madhukar Bharti <bh...@gmail.com>
wrote:

> Hi Mayuresh,
>
> Please check this
> <https://github.com/madhukarbharti/kafka-8.2.1-test/blob/master/src/com/bharti/kafka/offset/OffsetRequester.java> program.
> Am I doing any mistake?
>
> Thanks
>
>
> On Thu, Mar 26, 2015 at 6:27 PM, Madhukar Bharti <bhartimadhukar@gmail.com
> > wrote:
>
>> Hi Mayuresh,
>>
>> I have tried to fetch the offset using OffsetFetchRequest as given in
>> this wiki
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>>
>> But It only works if we set "dual.commit.enabled" to "true" and
>> "offsets.storage" to "kafka". Otherwise it returns -1.
>>
>> Do I need to change anything?
>>
>>
>> Thanks in advance!
>>
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: Kafka 8.2.1 Offset fetch Request

Posted by Mayuresh Gharat <gh...@gmail.com>.
On a first pass, the code has a small bug in getConsumedOffset().

You need to close the channel after getting the coordinator   : // broker =
cmr.coordinator();
 and then reconnect to coordinator as follows :

      broker = cmr.coordinator();
// if the coordinator is different, from the above channel's host then
reconnect
    *channel.disconnect();*
*    channel = new BlockingChannel(broker.host(), broker.port(),*
*
BlockingChannel.UseDefaultBufferSize(),*
*
BlockingChannel.UseDefaultBufferSize(),*
*                                          5000 /* read timeout in millis
*/);*
    *channel.connect();*
......


Same goes for commitOffsets().



Thanks,

Mayuresh


On Tue, Mar 31, 2015 at 11:52 AM, Madhukar Bharti <bh...@gmail.com>
wrote:

> Hi Mayuresh,
>
> Have you gone through that code? Please help me in that. If it will work
> for high level consumer then will plan accordingly to store offset of
> low-level consumer too in this offset topic. Will that work if I will give
> some group name for low-level consumers also but process it partition wise
> as Simple consumers are there.
>
> Thanks and awaiting for your response!
>
> On Mon, Mar 30, 2015 at 9:45 PM, Mayuresh Gharat <
> gharatmayuresh15@gmail.com> wrote:
>
>> Cool. Will try reviewing it today and get back :)
>>
>> Thanks,
>>
>> Mayuresh
>>
>> On Mon, Mar 30, 2015 at 2:53 AM, Madhukar Bharti <
>> bhartimadhukar@gmail.com> wrote:
>>
>>> Hi Mayuresh,
>>>
>>> Thanks for your quick response!
>>>
>>> I have tried to use offset manager for fetch/commit request. I have set
>>> "dua.commit.enable" to false and using offset.storage=kafka.
>>>
>>> But I am not able to commit the offset. My code is here
>>> <https://github.com/madhukarbharti/kafka-8.2.1-test/tree/master/src/com/bharti/kafka/offset>
>>> .
>>>
>>> Kindly check this. Am I missing anything. I am running with single
>>> broker.
>>>
>>> Thanks!
>>>
>>> On Fri, Mar 27, 2015 at 10:06 PM, Mayuresh Gharat <
>>> gharatmayuresh15@gmail.com> wrote:
>>>
>>>> In your case you are trying to issue an offsetRequest and not a
>>>> fetchOffsetRequest. I know this is little confusing.
>>>>
>>>> Let me point you to a scala patch which has a client for doing fetch
>>>> offset and commit offset.
>>>>
>>>> I am going to rewrite that in java. Here is the Kafka ticket :
>>>>
>>>> https://issues.apache.org/jira/browse/KAFKA-1013
>>>>
>>>> You can look at the RB and see how it is done. If you have any further
>>>> questions I will be happy to answer them.
>>>>
>>>> Thanks,
>>>>
>>>> Mayuresh
>>>>
>>>>
>>>> On Fri, Mar 27, 2015 at 9:30 AM, Mayuresh Gharat <
>>>> gharatmayuresh15@gmail.com> wrote:
>>>>
>>>>> Other thing is if you are using SimpleConsumer, it is up to your app
>>>>> to do the offsetManagement. The ZK based offsets or Kafka based offsets
>>>>> will work if you are using the HighLevel Consumer.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Mayuresh
>>>>>
>>>>> On Fri, Mar 27, 2015 at 9:17 AM, Mayuresh Gharat <
>>>>> gharatmayuresh15@gmail.com> wrote:
>>>>>
>>>>>> Hi Madhukar,
>>>>>>
>>>>>> I am going through your code now. Let me see what I can find.
>>>>>>
>>>>>> Where were you storing your offsets before?
>>>>>> Was it always Zookeeper or was it Kafka?
>>>>>> If it was Zookeeper, the correct way to migrate from zookeeper to
>>>>>> kafka based offsets is this :
>>>>>>
>>>>>> 1) Config Change :
>>>>>>      - offsets.storage = kafka
>>>>>>      - dual.commit.enabled = true
>>>>>> 2) Rolling Bounce
>>>>>> 3) Config Change :
>>>>>>      - dual.commit.enabled=false
>>>>>> 4) Rolling Bounce.
>>>>>>
>>>>>> For more info on Offset Management, you can also refer these slides
>>>>>> from Kafka Meetup:
>>>>>> http://www.slideshare.net/jjkoshy/offset-management-in-kafka
>>>>>>
>>>>>>
>>>>>> Apart from that for using Kafka based offsets, to do a
>>>>>> fetchOffsetRequest or commit offset request you don't need a consumer. You
>>>>>> need to know the groupId. You need to connect to kafka, issue a
>>>>>> consumerMetaData Request. This will fetch you the OffsetManager for that
>>>>>> groupId. You can then issue the fetch or commit request to that
>>>>>> OffsetManager.
>>>>>>
>>>>>> BTW, we are coming up with an offsetClient soon.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Mayuresh
>>>>>>
>>>>>> On Fri, Mar 27, 2015 at 1:53 AM, Madhukar Bharti <
>>>>>> bhartimadhukar@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Mayuresh,
>>>>>>>
>>>>>>> Please check this
>>>>>>> <https://github.com/madhukarbharti/kafka-8.2.1-test/blob/master/src/com/bharti/kafka/offset/OffsetRequester.java> program.
>>>>>>> Am I doing any mistake?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 26, 2015 at 6:27 PM, Madhukar Bharti <
>>>>>>> bhartimadhukar@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Mayuresh,
>>>>>>>>
>>>>>>>> I have tried to fetch the offset using OffsetFetchRequest as given
>>>>>>>> in this wiki
>>>>>>>>
>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>>>>>>>>
>>>>>>>> But It only works if we set "dual.commit.enabled" to "true" and
>>>>>>>> "offsets.storage" to "kafka". Otherwise it returns -1.
>>>>>>>>
>>>>>>>> Do I need to change anything?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks in advance!
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -Regards,
>>>>>> Mayuresh R. Gharat
>>>>>> (862) 250-7125
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -Regards,
>>>>> Mayuresh R. Gharat
>>>>> (862) 250-7125
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> -Regards,
>>>> Mayuresh R. Gharat
>>>> (862) 250-7125
>>>>
>>>
>>>
>>
>>
>> --
>> -Regards,
>> Mayuresh R. Gharat
>> (862) 250-7125
>>
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: Kafka 8.2.1 Offset fetch Request

Posted by Mayuresh Gharat <gh...@gmail.com>.
In your case you are trying to issue an offsetRequest and not a
fetchOffsetRequest. I know this is little confusing.

Let me point you to a scala patch which has a client for doing fetch offset
and commit offset.

I am going to rewrite that in java. Here is the Kafka ticket :

https://issues.apache.org/jira/browse/KAFKA-1013

You can look at the RB and see how it is done. If you have any further
questions I will be happy to answer them.

Thanks,

Mayuresh


On Fri, Mar 27, 2015 at 9:30 AM, Mayuresh Gharat <gharatmayuresh15@gmail.com
> wrote:

> Other thing is if you are using SimpleConsumer, it is up to your app to do
> the offsetManagement. The ZK based offsets or Kafka based offsets will work
> if you are using the HighLevel Consumer.
>
> Thanks,
>
> Mayuresh
>
> On Fri, Mar 27, 2015 at 9:17 AM, Mayuresh Gharat <
> gharatmayuresh15@gmail.com> wrote:
>
>> Hi Madhukar,
>>
>> I am going through your code now. Let me see what I can find.
>>
>> Where were you storing your offsets before?
>> Was it always Zookeeper or was it Kafka?
>> If it was Zookeeper, the correct way to migrate from zookeeper to kafka
>> based offsets is this :
>>
>> 1) Config Change :
>>      - offsets.storage = kafka
>>      - dual.commit.enabled = true
>> 2) Rolling Bounce
>> 3) Config Change :
>>      - dual.commit.enabled=false
>> 4) Rolling Bounce.
>>
>> For more info on Offset Management, you can also refer these slides from
>> Kafka Meetup:
>> http://www.slideshare.net/jjkoshy/offset-management-in-kafka
>>
>>
>> Apart from that for using Kafka based offsets, to do a fetchOffsetRequest
>> or commit offset request you don't need a consumer. You need to know the
>> groupId. You need to connect to kafka, issue a consumerMetaData Request.
>> This will fetch you the OffsetManager for that groupId. You can then issue
>> the fetch or commit request to that OffsetManager.
>>
>> BTW, we are coming up with an offsetClient soon.
>>
>> Thanks,
>>
>> Mayuresh
>>
>> On Fri, Mar 27, 2015 at 1:53 AM, Madhukar Bharti <
>> bhartimadhukar@gmail.com> wrote:
>>
>>> Hi Mayuresh,
>>>
>>> Please check this
>>> <https://github.com/madhukarbharti/kafka-8.2.1-test/blob/master/src/com/bharti/kafka/offset/OffsetRequester.java> program.
>>> Am I doing any mistake?
>>>
>>> Thanks
>>>
>>>
>>> On Thu, Mar 26, 2015 at 6:27 PM, Madhukar Bharti <
>>> bhartimadhukar@gmail.com> wrote:
>>>
>>>> Hi Mayuresh,
>>>>
>>>> I have tried to fetch the offset using OffsetFetchRequest as given in
>>>> this wiki
>>>>
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>>>>
>>>> But It only works if we set "dual.commit.enabled" to "true" and
>>>> "offsets.storage" to "kafka". Otherwise it returns -1.
>>>>
>>>> Do I need to change anything?
>>>>
>>>>
>>>> Thanks in advance!
>>>>
>>>
>>>
>>
>>
>> --
>> -Regards,
>> Mayuresh R. Gharat
>> (862) 250-7125
>>
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: Kafka 8.2.1 Offset fetch Request

Posted by Mayuresh Gharat <gh...@gmail.com>.
Other thing is if you are using SimpleConsumer, it is up to your app to do
the offsetManagement. The ZK based offsets or Kafka based offsets will work
if you are using the HighLevel Consumer.

Thanks,

Mayuresh

On Fri, Mar 27, 2015 at 9:17 AM, Mayuresh Gharat <gharatmayuresh15@gmail.com
> wrote:

> Hi Madhukar,
>
> I am going through your code now. Let me see what I can find.
>
> Where were you storing your offsets before?
> Was it always Zookeeper or was it Kafka?
> If it was Zookeeper, the correct way to migrate from zookeeper to kafka
> based offsets is this :
>
> 1) Config Change :
>      - offsets.storage = kafka
>      - dual.commit.enabled = true
> 2) Rolling Bounce
> 3) Config Change :
>      - dual.commit.enabled=false
> 4) Rolling Bounce.
>
> For more info on Offset Management, you can also refer these slides from
> Kafka Meetup:
> http://www.slideshare.net/jjkoshy/offset-management-in-kafka
>
>
> Apart from that for using Kafka based offsets, to do a fetchOffsetRequest
> or commit offset request you don't need a consumer. You need to know the
> groupId. You need to connect to kafka, issue a consumerMetaData Request.
> This will fetch you the OffsetManager for that groupId. You can then issue
> the fetch or commit request to that OffsetManager.
>
> BTW, we are coming up with an offsetClient soon.
>
> Thanks,
>
> Mayuresh
>
> On Fri, Mar 27, 2015 at 1:53 AM, Madhukar Bharti <bhartimadhukar@gmail.com
> > wrote:
>
>> Hi Mayuresh,
>>
>> Please check this
>> <https://github.com/madhukarbharti/kafka-8.2.1-test/blob/master/src/com/bharti/kafka/offset/OffsetRequester.java> program.
>> Am I doing any mistake?
>>
>> Thanks
>>
>>
>> On Thu, Mar 26, 2015 at 6:27 PM, Madhukar Bharti <
>> bhartimadhukar@gmail.com> wrote:
>>
>>> Hi Mayuresh,
>>>
>>> I have tried to fetch the offset using OffsetFetchRequest as given in
>>> this wiki
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>>>
>>> But It only works if we set "dual.commit.enabled" to "true" and
>>> "offsets.storage" to "kafka". Otherwise it returns -1.
>>>
>>> Do I need to change anything?
>>>
>>>
>>> Thanks in advance!
>>>
>>
>>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125