You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Gerrit Jansen van Vuuren <ge...@gmail.com> on 2014/01/09 22:00:35 UTC

custom kafka consumer - strangeness

Hi,

I'm writing a custom consumer for kafka 0.8.
Everything works except for the following:

a. connect, send fetch, read all results
b. send fetch
c. send fetch
d. send fetch
e. via the console publisher, publish 2 messages
f. send fetch :corr-id 1
g. read 2 messages published :offsets [10 11] :corr-id 1
h. send fetch :corr-id 2
i. read 2 messages published :offsets [10 11] :corr-id 2
j.  send fetch ...

The problem is I get the messages sent twice as a response to two separate
fetch requests. The correlation id is distinct so it cannot be that I read
the response twice. The offsets of the 2 messages are are the same so they
are duplicates, and its not the producer sending the messages twice.

Note: the same connection is kept open the whole time, and I send
block,receive then send again, after the first 2 messages are read, the
offsets are incremented and the next fetch will ask kafka to give it
messages from the new offsets.

any ideas of why kafka would be sending the messages again on the second
fetch request?

Regards,
 Gerrit

Re: custom kafka consumer - strangeness

Posted by Gerrit Jansen van Vuuren <ge...@gmail.com>.
thanks, I will definitely put this in,
does the console-producer send compressed messages by default? I haven't
specified compression for it, so assumed that it will send plain text.




On Thu, Jan 9, 2014 at 10:14 PM, Chris Curtin <cu...@gmail.com>wrote:

> If you look at the example simple consumer:
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>
> You'll see:
>
>   if (currentOffset < readOffset) {
>         System.out.println("Found an old offset: " + currentOffset + "
> Expecting: " + readOffset);
>         continue;
>     }
>
> and a comment in the 'Reading the Data' part:
>
> Also note that we are explicitly checking that the offset being read is not
> less than the offset that we requested. This is needed since if Kafka is
> compressing the messages, the fetch request will return an entire
> compressed block even if the requested offset isn't the beginning of the
> compressed block. Thus a message we saw previously may be returned again.
>
> This is probably what is happening to you
>
> Chris
>
>
> On Thu, Jan 9, 2014 at 4:00 PM, Gerrit Jansen van Vuuren <
> gerritjvv@gmail.com> wrote:
>
> > Hi,
> >
> > I'm writing a custom consumer for kafka 0.8.
> > Everything works except for the following:
> >
> > a. connect, send fetch, read all results
> > b. send fetch
> > c. send fetch
> > d. send fetch
> > e. via the console publisher, publish 2 messages
> > f. send fetch :corr-id 1
> > g. read 2 messages published :offsets [10 11] :corr-id 1
> > h. send fetch :corr-id 2
> > i. read 2 messages published :offsets [10 11] :corr-id 2
> > j.  send fetch ...
> >
> > The problem is I get the messages sent twice as a response to two
> separate
> > fetch requests. The correlation id is distinct so it cannot be that I
> read
> > the response twice. The offsets of the 2 messages are are the same so
> they
> > are duplicates, and its not the producer sending the messages twice.
> >
> > Note: the same connection is kept open the whole time, and I send
> > block,receive then send again, after the first 2 messages are read, the
> > offsets are incremented and the next fetch will ask kafka to give it
> > messages from the new offsets.
> >
> > any ideas of why kafka would be sending the messages again on the second
> > fetch request?
> >
> > Regards,
> >  Gerrit
> >
>

Re: custom kafka consumer - strangeness

Posted by Chris Curtin <cu...@gmail.com>.
If you look at the example simple consumer:
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

You'll see:

  if (currentOffset < readOffset) {
        System.out.println("Found an old offset: " + currentOffset + "
Expecting: " + readOffset);
        continue;
    }

and a comment in the 'Reading the Data' part:

Also note that we are explicitly checking that the offset being read is not
less than the offset that we requested. This is needed since if Kafka is
compressing the messages, the fetch request will return an entire
compressed block even if the requested offset isn't the beginning of the
compressed block. Thus a message we saw previously may be returned again.

This is probably what is happening to you

Chris


On Thu, Jan 9, 2014 at 4:00 PM, Gerrit Jansen van Vuuren <
gerritjvv@gmail.com> wrote:

> Hi,
>
> I'm writing a custom consumer for kafka 0.8.
> Everything works except for the following:
>
> a. connect, send fetch, read all results
> b. send fetch
> c. send fetch
> d. send fetch
> e. via the console publisher, publish 2 messages
> f. send fetch :corr-id 1
> g. read 2 messages published :offsets [10 11] :corr-id 1
> h. send fetch :corr-id 2
> i. read 2 messages published :offsets [10 11] :corr-id 2
> j.  send fetch ...
>
> The problem is I get the messages sent twice as a response to two separate
> fetch requests. The correlation id is distinct so it cannot be that I read
> the response twice. The offsets of the 2 messages are are the same so they
> are duplicates, and its not the producer sending the messages twice.
>
> Note: the same connection is kept open the whole time, and I send
> block,receive then send again, after the first 2 messages are read, the
> offsets are incremented and the next fetch will ask kafka to give it
> messages from the new offsets.
>
> any ideas of why kafka would be sending the messages again on the second
> fetch request?
>
> Regards,
>  Gerrit
>

Re: custom kafka consumer - strangeness

Posted by Jun Rao <ju...@gmail.com>.
Do you have the request log turned on? If so, what's total time taken for
the corresponding fetch request?

Thanks,

Jun


On Sat, Jan 11, 2014 at 4:38 AM, Gerrit Jansen van Vuuren <
gerritjvv@gmail.com> wrote:

> I'm also seeing the following.
>
> I consume the data in the queue.
> Then after 10 seconds send another fetch request (with the incremented
> offset), and never receives a response from the broker, my code eventually
> times out (after 30seconds).
>
> The broker writes Expiring fetch request Name: FetchRequest; Version: 0;
> CorrelationId: 1389443537; ClientId: 1; ReplicaId: -1; MaxWait: 1000 ms;
> MinBytes: 1 bytes; RequestInfo: [ping,0] ->
> PartitionFetchInfo(187,1048576).
>
> This corresponds with the timed out fetch request.
>
>
>
>
>
>
> On Sat, Jan 11, 2014 at 12:19 PM, Gerrit Jansen van Vuuren <
> gerritjvv@gmail.com> wrote:
>
> > Hi,
> >
> >
> > No the offsets are not the same. I've printed out the values to see this,
> > and its not the case.
> >
> >
> >
> > On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> >> Are the offset used in the 2 fetch requests the same? If so, you will
> get
> >> the same messages twice. You consumer is responsible for advancing the
> >> offsets after consumption.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
> >> gerritjvv@gmail.com> wrote:
> >>
> >> > Hi,
> >> >
> >> > I'm writing a custom consumer for kafka 0.8.
> >> > Everything works except for the following:
> >> >
> >> > a. connect, send fetch, read all results
> >> > b. send fetch
> >> > c. send fetch
> >> > d. send fetch
> >> > e. via the console publisher, publish 2 messages
> >> > f. send fetch :corr-id 1
> >> > g. read 2 messages published :offsets [10 11] :corr-id 1
> >> > h. send fetch :corr-id 2
> >> > i. read 2 messages published :offsets [10 11] :corr-id 2
> >> > j.  send fetch ...
> >> >
> >> > The problem is I get the messages sent twice as a response to two
> >> separate
> >> > fetch requests. The correlation id is distinct so it cannot be that I
> >> read
> >> > the response twice. The offsets of the 2 messages are are the same so
> >> they
> >> > are duplicates, and its not the producer sending the messages twice.
> >> >
> >> > Note: the same connection is kept open the whole time, and I send
> >> > block,receive then send again, after the first 2 messages are read,
> the
> >> > offsets are incremented and the next fetch will ask kafka to give it
> >> > messages from the new offsets.
> >> >
> >> > any ideas of why kafka would be sending the messages again on the
> second
> >> > fetch request?
> >> >
> >> > Regards,
> >> >  Gerrit
> >> >
> >>
> >
> >
>

Re: custom kafka consumer - strangeness

Posted by Gerrit Jansen van Vuuren <ge...@gmail.com>.
I'm also seeing the following.

I consume the data in the queue.
Then after 10 seconds send another fetch request (with the incremented
offset), and never receives a response from the broker, my code eventually
times out (after 30seconds).

The broker writes Expiring fetch request Name: FetchRequest; Version: 0;
CorrelationId: 1389443537; ClientId: 1; ReplicaId: -1; MaxWait: 1000 ms;
MinBytes: 1 bytes; RequestInfo: [ping,0] ->
PartitionFetchInfo(187,1048576).

This corresponds with the timed out fetch request.






On Sat, Jan 11, 2014 at 12:19 PM, Gerrit Jansen van Vuuren <
gerritjvv@gmail.com> wrote:

> Hi,
>
>
> No the offsets are not the same. I've printed out the values to see this,
> and its not the case.
>
>
>
> On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <ju...@gmail.com> wrote:
>
>> Are the offset used in the 2 fetch requests the same? If so, you will get
>> the same messages twice. You consumer is responsible for advancing the
>> offsets after consumption.
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
>> gerritjvv@gmail.com> wrote:
>>
>> > Hi,
>> >
>> > I'm writing a custom consumer for kafka 0.8.
>> > Everything works except for the following:
>> >
>> > a. connect, send fetch, read all results
>> > b. send fetch
>> > c. send fetch
>> > d. send fetch
>> > e. via the console publisher, publish 2 messages
>> > f. send fetch :corr-id 1
>> > g. read 2 messages published :offsets [10 11] :corr-id 1
>> > h. send fetch :corr-id 2
>> > i. read 2 messages published :offsets [10 11] :corr-id 2
>> > j.  send fetch ...
>> >
>> > The problem is I get the messages sent twice as a response to two
>> separate
>> > fetch requests. The correlation id is distinct so it cannot be that I
>> read
>> > the response twice. The offsets of the 2 messages are are the same so
>> they
>> > are duplicates, and its not the producer sending the messages twice.
>> >
>> > Note: the same connection is kept open the whole time, and I send
>> > block,receive then send again, after the first 2 messages are read, the
>> > offsets are incremented and the next fetch will ask kafka to give it
>> > messages from the new offsets.
>> >
>> > any ideas of why kafka would be sending the messages again on the second
>> > fetch request?
>> >
>> > Regards,
>> >  Gerrit
>> >
>>
>
>

Re: custom kafka consumer - strangeness

Posted by Gerrit Jansen van Vuuren <ge...@gmail.com>.
Hi,

I've finally fixed this by closing the connection on timeout and creating a
new connection on the next send.

Thanks,
 Gerrit


On Tue, Jan 14, 2014 at 10:20 AM, Gerrit Jansen van Vuuren <
gerritjvv@gmail.com> wrote:

> Hi,
>
> thanks I will do this.
>
>
>
> On Tue, Jan 14, 2014 at 9:51 AM, Joe Stein <jo...@stealth.ly> wrote:
>
>> I Gerrit, do you have a ticket already for this issue?  Is it possible to
>> attach code that reproduces it?  Would be great if you can run it against
>> a
>> Kafka VM you can grab one from this project for 0.8.0
>> https://github.com/stealthly/scala-kafka to launch a Kafka VM and add
>> whatever you need to it to reproduce the issue or from
>> https://issues.apache.org/jira/browse/KAFKA-1173 for 0.8.1.  I think if
>> you
>> can reproduce it in an environment comfortably that is in a controlled
>> isolation that would be helpful for folks to reproduce and work towards
>> resolution.... At least if it is a bug we can get a detailed capture of
>> what the bug is in the JIRA ticket and start discussing how to fix it.
>>
>> /*******************************************
>>  Joe Stein
>>  Founder, Principal Consultant
>>  Big Data Open Source Security LLC
>>  http://www.stealth.ly
>>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> ********************************************/
>>
>>
>> On Tue, Jan 14, 2014 at 3:38 AM, Gerrit Jansen van Vuuren <
>> gerritjvv@gmail.com> wrote:
>>
>> > Yes, I'm using my own client following:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>> >
>> > Everything works except for this weirdness.
>> >
>> >
>> > On Tue, Jan 14, 2014 at 5:50 AM, Jun Rao <ju...@gmail.com> wrote:
>> >
>> > > So, you implemented your own consumer client using netty?
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Mon, Jan 13, 2014 at 8:42 AM, Gerrit Jansen van Vuuren <
>> > > gerritjvv@gmail.com> wrote:
>> > >
>> > > > I'm using netty and async write, read.
>> > > > For read I used a timeout such that if I do not see anything on the
>> > read
>> > > > channel, my read function times out and returns null.
>> > > > I do not see any error on the socket, and the same socket is used
>> > > > throughout all of the fetches.
>> > > >
>> > > > I'm using the console producer and messages are "11", "22", "abc",
>> > "iiii"
>> > > > etc.
>> > > >
>> > > > I can reliably reproduce it every time.
>> > > >
>> > > > Its weird yes, no compression is used, the timeout happens for the
>> same
>> > > > scenario every time.
>> > > >
>> > > >
>> > > >
>> > > > On Mon, Jan 13, 2014 at 4:44 PM, Jun Rao <ju...@gmail.com> wrote:
>> > > >
>> > > > > I can't seen to find the log trace for the timed out fetch request
>> > > (every
>> > > > > fetch request seems to have a corresponding completed entry). For
>> the
>> > > > timed
>> > > > > out fetch request, is it that the broker never completed the
>> request
>> > or
>> > > > is
>> > > > > it that it just took longer than the socket timeout to finish
>> > > processing
>> > > > > the request? Do you use large messages in your test?
>> > > > >
>> > > > > If you haven't enabled compression, it's weird that you will
>> re-get
>> > 240
>> > > > and
>> > > > > 241 with an offset of 242 in the fetch request. Is that easily
>> > > > > reproducible?
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > >
>> > > > > On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren <
>> > > > > gerritjvv@gmail.com> wrote:
>> > > > >
>> > > > > > Hi,
>> > > > > >
>> > > > > > the offset in g is 240, and in i 242, the last message read was
>> at
>> > > > offset
>> > > > > > 239.
>> > > > > >
>> > > > > > After reading from 0 - 239, I make another request for 240, this
>> > > > request
>> > > > > > timesout and never returns.
>> > > > > > I then manually add 2 entries via the console producer, all the
>> > time
>> > > > > while
>> > > > > > making a request for 240 every 10 seconds, all subsequent
>> requests
>> > > for
>> > > > > > offset 240 returns empty messages, till the responses are
>> written.
>> > > > Then I
>> > > > > > get the 2 messages at offsets 240,241 and an end of response.
>> Then
>> > I
>> > > > > make a
>> > > > > > request for offset 242, and get the messages at offsets 240,241
>> > > again.
>> > > > > >
>> > > > > > I've attached a portion of the kafka-request.log set to trace.
>> > > > > >
>> > > > > > The correlation ids are:
>> > > > > > 1389604489 - first request at offset 0
>> > > > > > 1389604511  - timeout at offset 240
>> > > > > > 1389604563  - got data request at offset 240
>> > > > > > 1389604573  - got duplicates request at offset 242
>> > > > > >
>> > > > > > Regards,
>> > > > > >  Gerrit
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Mon, Jan 13, 2014 at 5:10 AM, Jun Rao <ju...@gmail.com>
>> wrote:
>> > > > > >
>> > > > > >> What's the offset used in the fetch request in steps g and i
>> that
>> > > both
>> > > > > >> returned offsets 10 and 11?
>> > > > > >>
>> > > > > >> Thanks,
>> > > > > >>
>> > > > > >> Jun
>> > > > > >>
>> > > > > >>
>> > > > > >> On Sat, Jan 11, 2014 at 3:19 AM, Gerrit Jansen van Vuuren <
>> > > > > >> gerritjvv@gmail.com> wrote:
>> > > > > >>
>> > > > > >> > Hi,
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > No the offsets are not the same. I've printed out the values
>> to
>> > > see
>> > > > > >> this,
>> > > > > >> > and its not the case.
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <ju...@gmail.com>
>> > > wrote:
>> > > > > >> >
>> > > > > >> > > Are the offset used in the 2 fetch requests the same? If
>> so,
>> > you
>> > > > > will
>> > > > > >> get
>> > > > > >> > > the same messages twice. You consumer is responsible for
>> > > advancing
>> > > > > the
>> > > > > >> > > offsets after consumption.
>> > > > > >> > >
>> > > > > >> > > Thanks,
>> > > > > >> > >
>> > > > > >> > > Jun
>> > > > > >> > >
>> > > > > >> > >
>> > > > > >> > > On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
>> > > > > >> > > gerritjvv@gmail.com> wrote:
>> > > > > >> > >
>> > > > > >> > > > Hi,
>> > > > > >> > > >
>> > > > > >> > > > I'm writing a custom consumer for kafka 0.8.
>> > > > > >> > > > Everything works except for the following:
>> > > > > >> > > >
>> > > > > >> > > > a. connect, send fetch, read all results
>> > > > > >> > > > b. send fetch
>> > > > > >> > > > c. send fetch
>> > > > > >> > > > d. send fetch
>> > > > > >> > > > e. via the console publisher, publish 2 messages
>> > > > > >> > > > f. send fetch :corr-id 1
>> > > > > >> > > > g. read 2 messages published :offsets [10 11] :corr-id 1
>> > > > > >> > > > h. send fetch :corr-id 2
>> > > > > >> > > > i. read 2 messages published :offsets [10 11] :corr-id 2
>> > > > > >> > > > j.  send fetch ...
>> > > > > >> > > >
>> > > > > >> > > > The problem is I get the messages sent twice as a
>> response
>> > to
>> > > > two
>> > > > > >> > > separate
>> > > > > >> > > > fetch requests. The correlation id is distinct so it
>> cannot
>> > be
>> > > > > that
>> > > > > >> I
>> > > > > >> > > read
>> > > > > >> > > > the response twice. The offsets of the 2 messages are are
>> > the
>> > > > same
>> > > > > >> so
>> > > > > >> > > they
>> > > > > >> > > > are duplicates, and its not the producer sending the
>> > messages
>> > > > > twice.
>> > > > > >> > > >
>> > > > > >> > > > Note: the same connection is kept open the whole time,
>> and I
>> > > > send
>> > > > > >> > > > block,receive then send again, after the first 2 messages
>> > are
>> > > > > read,
>> > > > > >> the
>> > > > > >> > > > offsets are incremented and the next fetch will ask
>> kafka to
>> > > > give
>> > > > > it
>> > > > > >> > > > messages from the new offsets.
>> > > > > >> > > >
>> > > > > >> > > > any ideas of why kafka would be sending the messages
>> again
>> > on
>> > > > the
>> > > > > >> > second
>> > > > > >> > > > fetch request?
>> > > > > >> > > >
>> > > > > >> > > > Regards,
>> > > > > >> > > >  Gerrit
>> > > > > >> > > >
>> > > > > >> > >
>> > > > > >> >
>> > > > > >>
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: custom kafka consumer - strangeness

Posted by Gerrit Jansen van Vuuren <ge...@gmail.com>.
Hi,

thanks I will do this.



On Tue, Jan 14, 2014 at 9:51 AM, Joe Stein <jo...@stealth.ly> wrote:

> I Gerrit, do you have a ticket already for this issue?  Is it possible to
> attach code that reproduces it?  Would be great if you can run it against a
> Kafka VM you can grab one from this project for 0.8.0
> https://github.com/stealthly/scala-kafka to launch a Kafka VM and add
> whatever you need to it to reproduce the issue or from
> https://issues.apache.org/jira/browse/KAFKA-1173 for 0.8.1.  I think if
> you
> can reproduce it in an environment comfortably that is in a controlled
> isolation that would be helpful for folks to reproduce and work towards
> resolution.... At least if it is a bug we can get a detailed capture of
> what the bug is in the JIRA ticket and start discussing how to fix it.
>
> /*******************************************
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
>
>
> On Tue, Jan 14, 2014 at 3:38 AM, Gerrit Jansen van Vuuren <
> gerritjvv@gmail.com> wrote:
>
> > Yes, I'm using my own client following:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >
> > Everything works except for this weirdness.
> >
> >
> > On Tue, Jan 14, 2014 at 5:50 AM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > So, you implemented your own consumer client using netty?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Jan 13, 2014 at 8:42 AM, Gerrit Jansen van Vuuren <
> > > gerritjvv@gmail.com> wrote:
> > >
> > > > I'm using netty and async write, read.
> > > > For read I used a timeout such that if I do not see anything on the
> > read
> > > > channel, my read function times out and returns null.
> > > > I do not see any error on the socket, and the same socket is used
> > > > throughout all of the fetches.
> > > >
> > > > I'm using the console producer and messages are "11", "22", "abc",
> > "iiii"
> > > > etc.
> > > >
> > > > I can reliably reproduce it every time.
> > > >
> > > > Its weird yes, no compression is used, the timeout happens for the
> same
> > > > scenario every time.
> > > >
> > > >
> > > >
> > > > On Mon, Jan 13, 2014 at 4:44 PM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > I can't seen to find the log trace for the timed out fetch request
> > > (every
> > > > > fetch request seems to have a corresponding completed entry). For
> the
> > > > timed
> > > > > out fetch request, is it that the broker never completed the
> request
> > or
> > > > is
> > > > > it that it just took longer than the socket timeout to finish
> > > processing
> > > > > the request? Do you use large messages in your test?
> > > > >
> > > > > If you haven't enabled compression, it's weird that you will re-get
> > 240
> > > > and
> > > > > 241 with an offset of 242 in the fetch request. Is that easily
> > > > > reproducible?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren <
> > > > > gerritjvv@gmail.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > the offset in g is 240, and in i 242, the last message read was
> at
> > > > offset
> > > > > > 239.
> > > > > >
> > > > > > After reading from 0 - 239, I make another request for 240, this
> > > > request
> > > > > > timesout and never returns.
> > > > > > I then manually add 2 entries via the console producer, all the
> > time
> > > > > while
> > > > > > making a request for 240 every 10 seconds, all subsequent
> requests
> > > for
> > > > > > offset 240 returns empty messages, till the responses are
> written.
> > > > Then I
> > > > > > get the 2 messages at offsets 240,241 and an end of response.
> Then
> > I
> > > > > make a
> > > > > > request for offset 242, and get the messages at offsets 240,241
> > > again.
> > > > > >
> > > > > > I've attached a portion of the kafka-request.log set to trace.
> > > > > >
> > > > > > The correlation ids are:
> > > > > > 1389604489 - first request at offset 0
> > > > > > 1389604511  - timeout at offset 240
> > > > > > 1389604563  - got data request at offset 240
> > > > > > 1389604573  - got duplicates request at offset 242
> > > > > >
> > > > > > Regards,
> > > > > >  Gerrit
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Jan 13, 2014 at 5:10 AM, Jun Rao <ju...@gmail.com>
> wrote:
> > > > > >
> > > > > >> What's the offset used in the fetch request in steps g and i
> that
> > > both
> > > > > >> returned offsets 10 and 11?
> > > > > >>
> > > > > >> Thanks,
> > > > > >>
> > > > > >> Jun
> > > > > >>
> > > > > >>
> > > > > >> On Sat, Jan 11, 2014 at 3:19 AM, Gerrit Jansen van Vuuren <
> > > > > >> gerritjvv@gmail.com> wrote:
> > > > > >>
> > > > > >> > Hi,
> > > > > >> >
> > > > > >> >
> > > > > >> > No the offsets are not the same. I've printed out the values
> to
> > > see
> > > > > >> this,
> > > > > >> > and its not the case.
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <ju...@gmail.com>
> > > wrote:
> > > > > >> >
> > > > > >> > > Are the offset used in the 2 fetch requests the same? If so,
> > you
> > > > > will
> > > > > >> get
> > > > > >> > > the same messages twice. You consumer is responsible for
> > > advancing
> > > > > the
> > > > > >> > > offsets after consumption.
> > > > > >> > >
> > > > > >> > > Thanks,
> > > > > >> > >
> > > > > >> > > Jun
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
> > > > > >> > > gerritjvv@gmail.com> wrote:
> > > > > >> > >
> > > > > >> > > > Hi,
> > > > > >> > > >
> > > > > >> > > > I'm writing a custom consumer for kafka 0.8.
> > > > > >> > > > Everything works except for the following:
> > > > > >> > > >
> > > > > >> > > > a. connect, send fetch, read all results
> > > > > >> > > > b. send fetch
> > > > > >> > > > c. send fetch
> > > > > >> > > > d. send fetch
> > > > > >> > > > e. via the console publisher, publish 2 messages
> > > > > >> > > > f. send fetch :corr-id 1
> > > > > >> > > > g. read 2 messages published :offsets [10 11] :corr-id 1
> > > > > >> > > > h. send fetch :corr-id 2
> > > > > >> > > > i. read 2 messages published :offsets [10 11] :corr-id 2
> > > > > >> > > > j.  send fetch ...
> > > > > >> > > >
> > > > > >> > > > The problem is I get the messages sent twice as a response
> > to
> > > > two
> > > > > >> > > separate
> > > > > >> > > > fetch requests. The correlation id is distinct so it
> cannot
> > be
> > > > > that
> > > > > >> I
> > > > > >> > > read
> > > > > >> > > > the response twice. The offsets of the 2 messages are are
> > the
> > > > same
> > > > > >> so
> > > > > >> > > they
> > > > > >> > > > are duplicates, and its not the producer sending the
> > messages
> > > > > twice.
> > > > > >> > > >
> > > > > >> > > > Note: the same connection is kept open the whole time,
> and I
> > > > send
> > > > > >> > > > block,receive then send again, after the first 2 messages
> > are
> > > > > read,
> > > > > >> the
> > > > > >> > > > offsets are incremented and the next fetch will ask kafka
> to
> > > > give
> > > > > it
> > > > > >> > > > messages from the new offsets.
> > > > > >> > > >
> > > > > >> > > > any ideas of why kafka would be sending the messages again
> > on
> > > > the
> > > > > >> > second
> > > > > >> > > > fetch request?
> > > > > >> > > >
> > > > > >> > > > Regards,
> > > > > >> > > >  Gerrit
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: custom kafka consumer - strangeness

Posted by Joe Stein <jo...@stealth.ly>.
I Gerrit, do you have a ticket already for this issue?  Is it possible to
attach code that reproduces it?  Would be great if you can run it against a
Kafka VM you can grab one from this project for 0.8.0
https://github.com/stealthly/scala-kafka to launch a Kafka VM and add
whatever you need to it to reproduce the issue or from
https://issues.apache.org/jira/browse/KAFKA-1173 for 0.8.1.  I think if you
can reproduce it in an environment comfortably that is in a controlled
isolation that would be helpful for folks to reproduce and work towards
resolution.... At least if it is a bug we can get a detailed capture of
what the bug is in the JIRA ticket and start discussing how to fix it.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/


On Tue, Jan 14, 2014 at 3:38 AM, Gerrit Jansen van Vuuren <
gerritjvv@gmail.com> wrote:

> Yes, I'm using my own client following:
>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>
> Everything works except for this weirdness.
>
>
> On Tue, Jan 14, 2014 at 5:50 AM, Jun Rao <ju...@gmail.com> wrote:
>
> > So, you implemented your own consumer client using netty?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Jan 13, 2014 at 8:42 AM, Gerrit Jansen van Vuuren <
> > gerritjvv@gmail.com> wrote:
> >
> > > I'm using netty and async write, read.
> > > For read I used a timeout such that if I do not see anything on the
> read
> > > channel, my read function times out and returns null.
> > > I do not see any error on the socket, and the same socket is used
> > > throughout all of the fetches.
> > >
> > > I'm using the console producer and messages are "11", "22", "abc",
> "iiii"
> > > etc.
> > >
> > > I can reliably reproduce it every time.
> > >
> > > Its weird yes, no compression is used, the timeout happens for the same
> > > scenario every time.
> > >
> > >
> > >
> > > On Mon, Jan 13, 2014 at 4:44 PM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > I can't seen to find the log trace for the timed out fetch request
> > (every
> > > > fetch request seems to have a corresponding completed entry). For the
> > > timed
> > > > out fetch request, is it that the broker never completed the request
> or
> > > is
> > > > it that it just took longer than the socket timeout to finish
> > processing
> > > > the request? Do you use large messages in your test?
> > > >
> > > > If you haven't enabled compression, it's weird that you will re-get
> 240
> > > and
> > > > 241 with an offset of 242 in the fetch request. Is that easily
> > > > reproducible?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren <
> > > > gerritjvv@gmail.com> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > the offset in g is 240, and in i 242, the last message read was at
> > > offset
> > > > > 239.
> > > > >
> > > > > After reading from 0 - 239, I make another request for 240, this
> > > request
> > > > > timesout and never returns.
> > > > > I then manually add 2 entries via the console producer, all the
> time
> > > > while
> > > > > making a request for 240 every 10 seconds, all subsequent requests
> > for
> > > > > offset 240 returns empty messages, till the responses are written.
> > > Then I
> > > > > get the 2 messages at offsets 240,241 and an end of response. Then
> I
> > > > make a
> > > > > request for offset 242, and get the messages at offsets 240,241
> > again.
> > > > >
> > > > > I've attached a portion of the kafka-request.log set to trace.
> > > > >
> > > > > The correlation ids are:
> > > > > 1389604489 - first request at offset 0
> > > > > 1389604511  - timeout at offset 240
> > > > > 1389604563  - got data request at offset 240
> > > > > 1389604573  - got duplicates request at offset 242
> > > > >
> > > > > Regards,
> > > > >  Gerrit
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Jan 13, 2014 at 5:10 AM, Jun Rao <ju...@gmail.com> wrote:
> > > > >
> > > > >> What's the offset used in the fetch request in steps g and i that
> > both
> > > > >> returned offsets 10 and 11?
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jun
> > > > >>
> > > > >>
> > > > >> On Sat, Jan 11, 2014 at 3:19 AM, Gerrit Jansen van Vuuren <
> > > > >> gerritjvv@gmail.com> wrote:
> > > > >>
> > > > >> > Hi,
> > > > >> >
> > > > >> >
> > > > >> > No the offsets are not the same. I've printed out the values to
> > see
> > > > >> this,
> > > > >> > and its not the case.
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <ju...@gmail.com>
> > wrote:
> > > > >> >
> > > > >> > > Are the offset used in the 2 fetch requests the same? If so,
> you
> > > > will
> > > > >> get
> > > > >> > > the same messages twice. You consumer is responsible for
> > advancing
> > > > the
> > > > >> > > offsets after consumption.
> > > > >> > >
> > > > >> > > Thanks,
> > > > >> > >
> > > > >> > > Jun
> > > > >> > >
> > > > >> > >
> > > > >> > > On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
> > > > >> > > gerritjvv@gmail.com> wrote:
> > > > >> > >
> > > > >> > > > Hi,
> > > > >> > > >
> > > > >> > > > I'm writing a custom consumer for kafka 0.8.
> > > > >> > > > Everything works except for the following:
> > > > >> > > >
> > > > >> > > > a. connect, send fetch, read all results
> > > > >> > > > b. send fetch
> > > > >> > > > c. send fetch
> > > > >> > > > d. send fetch
> > > > >> > > > e. via the console publisher, publish 2 messages
> > > > >> > > > f. send fetch :corr-id 1
> > > > >> > > > g. read 2 messages published :offsets [10 11] :corr-id 1
> > > > >> > > > h. send fetch :corr-id 2
> > > > >> > > > i. read 2 messages published :offsets [10 11] :corr-id 2
> > > > >> > > > j.  send fetch ...
> > > > >> > > >
> > > > >> > > > The problem is I get the messages sent twice as a response
> to
> > > two
> > > > >> > > separate
> > > > >> > > > fetch requests. The correlation id is distinct so it cannot
> be
> > > > that
> > > > >> I
> > > > >> > > read
> > > > >> > > > the response twice. The offsets of the 2 messages are are
> the
> > > same
> > > > >> so
> > > > >> > > they
> > > > >> > > > are duplicates, and its not the producer sending the
> messages
> > > > twice.
> > > > >> > > >
> > > > >> > > > Note: the same connection is kept open the whole time, and I
> > > send
> > > > >> > > > block,receive then send again, after the first 2 messages
> are
> > > > read,
> > > > >> the
> > > > >> > > > offsets are incremented and the next fetch will ask kafka to
> > > give
> > > > it
> > > > >> > > > messages from the new offsets.
> > > > >> > > >
> > > > >> > > > any ideas of why kafka would be sending the messages again
> on
> > > the
> > > > >> > second
> > > > >> > > > fetch request?
> > > > >> > > >
> > > > >> > > > Regards,
> > > > >> > > >  Gerrit
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: custom kafka consumer - strangeness

Posted by Gerrit Jansen van Vuuren <ge...@gmail.com>.
Yes, I'm using my own client following:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

Everything works except for this weirdness.


On Tue, Jan 14, 2014 at 5:50 AM, Jun Rao <ju...@gmail.com> wrote:

> So, you implemented your own consumer client using netty?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jan 13, 2014 at 8:42 AM, Gerrit Jansen van Vuuren <
> gerritjvv@gmail.com> wrote:
>
> > I'm using netty and async write, read.
> > For read I used a timeout such that if I do not see anything on the read
> > channel, my read function times out and returns null.
> > I do not see any error on the socket, and the same socket is used
> > throughout all of the fetches.
> >
> > I'm using the console producer and messages are "11", "22", "abc", "iiii"
> > etc.
> >
> > I can reliably reproduce it every time.
> >
> > Its weird yes, no compression is used, the timeout happens for the same
> > scenario every time.
> >
> >
> >
> > On Mon, Jan 13, 2014 at 4:44 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > I can't seen to find the log trace for the timed out fetch request
> (every
> > > fetch request seems to have a corresponding completed entry). For the
> > timed
> > > out fetch request, is it that the broker never completed the request or
> > is
> > > it that it just took longer than the socket timeout to finish
> processing
> > > the request? Do you use large messages in your test?
> > >
> > > If you haven't enabled compression, it's weird that you will re-get 240
> > and
> > > 241 with an offset of 242 in the fetch request. Is that easily
> > > reproducible?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren <
> > > gerritjvv@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > the offset in g is 240, and in i 242, the last message read was at
> > offset
> > > > 239.
> > > >
> > > > After reading from 0 - 239, I make another request for 240, this
> > request
> > > > timesout and never returns.
> > > > I then manually add 2 entries via the console producer, all the time
> > > while
> > > > making a request for 240 every 10 seconds, all subsequent requests
> for
> > > > offset 240 returns empty messages, till the responses are written.
> > Then I
> > > > get the 2 messages at offsets 240,241 and an end of response. Then I
> > > make a
> > > > request for offset 242, and get the messages at offsets 240,241
> again.
> > > >
> > > > I've attached a portion of the kafka-request.log set to trace.
> > > >
> > > > The correlation ids are:
> > > > 1389604489 - first request at offset 0
> > > > 1389604511  - timeout at offset 240
> > > > 1389604563  - got data request at offset 240
> > > > 1389604573  - got duplicates request at offset 242
> > > >
> > > > Regards,
> > > >  Gerrit
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Jan 13, 2014 at 5:10 AM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > >> What's the offset used in the fetch request in steps g and i that
> both
> > > >> returned offsets 10 and 11?
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >>
> > > >> On Sat, Jan 11, 2014 at 3:19 AM, Gerrit Jansen van Vuuren <
> > > >> gerritjvv@gmail.com> wrote:
> > > >>
> > > >> > Hi,
> > > >> >
> > > >> >
> > > >> > No the offsets are not the same. I've printed out the values to
> see
> > > >> this,
> > > >> > and its not the case.
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <ju...@gmail.com>
> wrote:
> > > >> >
> > > >> > > Are the offset used in the 2 fetch requests the same? If so, you
> > > will
> > > >> get
> > > >> > > the same messages twice. You consumer is responsible for
> advancing
> > > the
> > > >> > > offsets after consumption.
> > > >> > >
> > > >> > > Thanks,
> > > >> > >
> > > >> > > Jun
> > > >> > >
> > > >> > >
> > > >> > > On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
> > > >> > > gerritjvv@gmail.com> wrote:
> > > >> > >
> > > >> > > > Hi,
> > > >> > > >
> > > >> > > > I'm writing a custom consumer for kafka 0.8.
> > > >> > > > Everything works except for the following:
> > > >> > > >
> > > >> > > > a. connect, send fetch, read all results
> > > >> > > > b. send fetch
> > > >> > > > c. send fetch
> > > >> > > > d. send fetch
> > > >> > > > e. via the console publisher, publish 2 messages
> > > >> > > > f. send fetch :corr-id 1
> > > >> > > > g. read 2 messages published :offsets [10 11] :corr-id 1
> > > >> > > > h. send fetch :corr-id 2
> > > >> > > > i. read 2 messages published :offsets [10 11] :corr-id 2
> > > >> > > > j.  send fetch ...
> > > >> > > >
> > > >> > > > The problem is I get the messages sent twice as a response to
> > two
> > > >> > > separate
> > > >> > > > fetch requests. The correlation id is distinct so it cannot be
> > > that
> > > >> I
> > > >> > > read
> > > >> > > > the response twice. The offsets of the 2 messages are are the
> > same
> > > >> so
> > > >> > > they
> > > >> > > > are duplicates, and its not the producer sending the messages
> > > twice.
> > > >> > > >
> > > >> > > > Note: the same connection is kept open the whole time, and I
> > send
> > > >> > > > block,receive then send again, after the first 2 messages are
> > > read,
> > > >> the
> > > >> > > > offsets are incremented and the next fetch will ask kafka to
> > give
> > > it
> > > >> > > > messages from the new offsets.
> > > >> > > >
> > > >> > > > any ideas of why kafka would be sending the messages again on
> > the
> > > >> > second
> > > >> > > > fetch request?
> > > >> > > >
> > > >> > > > Regards,
> > > >> > > >  Gerrit
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Re: custom kafka consumer - strangeness

Posted by Jun Rao <ju...@gmail.com>.
So, you implemented your own consumer client using netty?

Thanks,

Jun


On Mon, Jan 13, 2014 at 8:42 AM, Gerrit Jansen van Vuuren <
gerritjvv@gmail.com> wrote:

> I'm using netty and async write, read.
> For read I used a timeout such that if I do not see anything on the read
> channel, my read function times out and returns null.
> I do not see any error on the socket, and the same socket is used
> throughout all of the fetches.
>
> I'm using the console producer and messages are "11", "22", "abc", "iiii"
> etc.
>
> I can reliably reproduce it every time.
>
> Its weird yes, no compression is used, the timeout happens for the same
> scenario every time.
>
>
>
> On Mon, Jan 13, 2014 at 4:44 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > I can't seen to find the log trace for the timed out fetch request (every
> > fetch request seems to have a corresponding completed entry). For the
> timed
> > out fetch request, is it that the broker never completed the request or
> is
> > it that it just took longer than the socket timeout to finish processing
> > the request? Do you use large messages in your test?
> >
> > If you haven't enabled compression, it's weird that you will re-get 240
> and
> > 241 with an offset of 242 in the fetch request. Is that easily
> > reproducible?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren <
> > gerritjvv@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > the offset in g is 240, and in i 242, the last message read was at
> offset
> > > 239.
> > >
> > > After reading from 0 - 239, I make another request for 240, this
> request
> > > timesout and never returns.
> > > I then manually add 2 entries via the console producer, all the time
> > while
> > > making a request for 240 every 10 seconds, all subsequent requests for
> > > offset 240 returns empty messages, till the responses are written.
> Then I
> > > get the 2 messages at offsets 240,241 and an end of response. Then I
> > make a
> > > request for offset 242, and get the messages at offsets 240,241 again.
> > >
> > > I've attached a portion of the kafka-request.log set to trace.
> > >
> > > The correlation ids are:
> > > 1389604489 - first request at offset 0
> > > 1389604511  - timeout at offset 240
> > > 1389604563  - got data request at offset 240
> > > 1389604573  - got duplicates request at offset 242
> > >
> > > Regards,
> > >  Gerrit
> > >
> > >
> > >
> > >
> > > On Mon, Jan 13, 2014 at 5:10 AM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > >> What's the offset used in the fetch request in steps g and i that both
> > >> returned offsets 10 and 11?
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >>
> > >> On Sat, Jan 11, 2014 at 3:19 AM, Gerrit Jansen van Vuuren <
> > >> gerritjvv@gmail.com> wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> >
> > >> > No the offsets are not the same. I've printed out the values to see
> > >> this,
> > >> > and its not the case.
> > >> >
> > >> >
> > >> >
> > >> > On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <ju...@gmail.com> wrote:
> > >> >
> > >> > > Are the offset used in the 2 fetch requests the same? If so, you
> > will
> > >> get
> > >> > > the same messages twice. You consumer is responsible for advancing
> > the
> > >> > > offsets after consumption.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > >
> > >> > > On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
> > >> > > gerritjvv@gmail.com> wrote:
> > >> > >
> > >> > > > Hi,
> > >> > > >
> > >> > > > I'm writing a custom consumer for kafka 0.8.
> > >> > > > Everything works except for the following:
> > >> > > >
> > >> > > > a. connect, send fetch, read all results
> > >> > > > b. send fetch
> > >> > > > c. send fetch
> > >> > > > d. send fetch
> > >> > > > e. via the console publisher, publish 2 messages
> > >> > > > f. send fetch :corr-id 1
> > >> > > > g. read 2 messages published :offsets [10 11] :corr-id 1
> > >> > > > h. send fetch :corr-id 2
> > >> > > > i. read 2 messages published :offsets [10 11] :corr-id 2
> > >> > > > j.  send fetch ...
> > >> > > >
> > >> > > > The problem is I get the messages sent twice as a response to
> two
> > >> > > separate
> > >> > > > fetch requests. The correlation id is distinct so it cannot be
> > that
> > >> I
> > >> > > read
> > >> > > > the response twice. The offsets of the 2 messages are are the
> same
> > >> so
> > >> > > they
> > >> > > > are duplicates, and its not the producer sending the messages
> > twice.
> > >> > > >
> > >> > > > Note: the same connection is kept open the whole time, and I
> send
> > >> > > > block,receive then send again, after the first 2 messages are
> > read,
> > >> the
> > >> > > > offsets are incremented and the next fetch will ask kafka to
> give
> > it
> > >> > > > messages from the new offsets.
> > >> > > >
> > >> > > > any ideas of why kafka would be sending the messages again on
> the
> > >> > second
> > >> > > > fetch request?
> > >> > > >
> > >> > > > Regards,
> > >> > > >  Gerrit
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Re: custom kafka consumer - strangeness

Posted by Gerrit Jansen van Vuuren <ge...@gmail.com>.
I'm using netty and async write, read.
For read I used a timeout such that if I do not see anything on the read
channel, my read function times out and returns null.
I do not see any error on the socket, and the same socket is used
throughout all of the fetches.

I'm using the console producer and messages are "11", "22", "abc", "iiii"
etc.

I can reliably reproduce it every time.

Its weird yes, no compression is used, the timeout happens for the same
scenario every time.



On Mon, Jan 13, 2014 at 4:44 PM, Jun Rao <ju...@gmail.com> wrote:

> I can't seen to find the log trace for the timed out fetch request (every
> fetch request seems to have a corresponding completed entry). For the timed
> out fetch request, is it that the broker never completed the request or is
> it that it just took longer than the socket timeout to finish processing
> the request? Do you use large messages in your test?
>
> If you haven't enabled compression, it's weird that you will re-get 240 and
> 241 with an offset of 242 in the fetch request. Is that easily
> reproducible?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren <
> gerritjvv@gmail.com> wrote:
>
> > Hi,
> >
> > the offset in g is 240, and in i 242, the last message read was at offset
> > 239.
> >
> > After reading from 0 - 239, I make another request for 240, this request
> > timesout and never returns.
> > I then manually add 2 entries via the console producer, all the time
> while
> > making a request for 240 every 10 seconds, all subsequent requests for
> > offset 240 returns empty messages, till the responses are written. Then I
> > get the 2 messages at offsets 240,241 and an end of response. Then I
> make a
> > request for offset 242, and get the messages at offsets 240,241 again.
> >
> > I've attached a portion of the kafka-request.log set to trace.
> >
> > The correlation ids are:
> > 1389604489 - first request at offset 0
> > 1389604511  - timeout at offset 240
> > 1389604563  - got data request at offset 240
> > 1389604573  - got duplicates request at offset 242
> >
> > Regards,
> >  Gerrit
> >
> >
> >
> >
> > On Mon, Jan 13, 2014 at 5:10 AM, Jun Rao <ju...@gmail.com> wrote:
> >
> >> What's the offset used in the fetch request in steps g and i that both
> >> returned offsets 10 and 11?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Sat, Jan 11, 2014 at 3:19 AM, Gerrit Jansen van Vuuren <
> >> gerritjvv@gmail.com> wrote:
> >>
> >> > Hi,
> >> >
> >> >
> >> > No the offsets are not the same. I've printed out the values to see
> >> this,
> >> > and its not the case.
> >> >
> >> >
> >> >
> >> > On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <ju...@gmail.com> wrote:
> >> >
> >> > > Are the offset used in the 2 fetch requests the same? If so, you
> will
> >> get
> >> > > the same messages twice. You consumer is responsible for advancing
> the
> >> > > offsets after consumption.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > > On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
> >> > > gerritjvv@gmail.com> wrote:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > I'm writing a custom consumer for kafka 0.8.
> >> > > > Everything works except for the following:
> >> > > >
> >> > > > a. connect, send fetch, read all results
> >> > > > b. send fetch
> >> > > > c. send fetch
> >> > > > d. send fetch
> >> > > > e. via the console publisher, publish 2 messages
> >> > > > f. send fetch :corr-id 1
> >> > > > g. read 2 messages published :offsets [10 11] :corr-id 1
> >> > > > h. send fetch :corr-id 2
> >> > > > i. read 2 messages published :offsets [10 11] :corr-id 2
> >> > > > j.  send fetch ...
> >> > > >
> >> > > > The problem is I get the messages sent twice as a response to two
> >> > > separate
> >> > > > fetch requests. The correlation id is distinct so it cannot be
> that
> >> I
> >> > > read
> >> > > > the response twice. The offsets of the 2 messages are are the same
> >> so
> >> > > they
> >> > > > are duplicates, and its not the producer sending the messages
> twice.
> >> > > >
> >> > > > Note: the same connection is kept open the whole time, and I send
> >> > > > block,receive then send again, after the first 2 messages are
> read,
> >> the
> >> > > > offsets are incremented and the next fetch will ask kafka to give
> it
> >> > > > messages from the new offsets.
> >> > > >
> >> > > > any ideas of why kafka would be sending the messages again on the
> >> > second
> >> > > > fetch request?
> >> > > >
> >> > > > Regards,
> >> > > >  Gerrit
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: custom kafka consumer - strangeness

Posted by Jun Rao <ju...@gmail.com>.
I can't seen to find the log trace for the timed out fetch request (every
fetch request seems to have a corresponding completed entry). For the timed
out fetch request, is it that the broker never completed the request or is
it that it just took longer than the socket timeout to finish processing
the request? Do you use large messages in your test?

If you haven't enabled compression, it's weird that you will re-get 240 and
241 with an offset of 242 in the fetch request. Is that easily reproducible?

Thanks,

Jun


On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren <
gerritjvv@gmail.com> wrote:

> Hi,
>
> the offset in g is 240, and in i 242, the last message read was at offset
> 239.
>
> After reading from 0 - 239, I make another request for 240, this request
> timesout and never returns.
> I then manually add 2 entries via the console producer, all the time while
> making a request for 240 every 10 seconds, all subsequent requests for
> offset 240 returns empty messages, till the responses are written. Then I
> get the 2 messages at offsets 240,241 and an end of response. Then I make a
> request for offset 242, and get the messages at offsets 240,241 again.
>
> I've attached a portion of the kafka-request.log set to trace.
>
> The correlation ids are:
> 1389604489 - first request at offset 0
> 1389604511  - timeout at offset 240
> 1389604563  - got data request at offset 240
> 1389604573  - got duplicates request at offset 242
>
> Regards,
>  Gerrit
>
>
>
>
> On Mon, Jan 13, 2014 at 5:10 AM, Jun Rao <ju...@gmail.com> wrote:
>
>> What's the offset used in the fetch request in steps g and i that both
>> returned offsets 10 and 11?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Sat, Jan 11, 2014 at 3:19 AM, Gerrit Jansen van Vuuren <
>> gerritjvv@gmail.com> wrote:
>>
>> > Hi,
>> >
>> >
>> > No the offsets are not the same. I've printed out the values to see
>> this,
>> > and its not the case.
>> >
>> >
>> >
>> > On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <ju...@gmail.com> wrote:
>> >
>> > > Are the offset used in the 2 fetch requests the same? If so, you will
>> get
>> > > the same messages twice. You consumer is responsible for advancing the
>> > > offsets after consumption.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
>> > > gerritjvv@gmail.com> wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I'm writing a custom consumer for kafka 0.8.
>> > > > Everything works except for the following:
>> > > >
>> > > > a. connect, send fetch, read all results
>> > > > b. send fetch
>> > > > c. send fetch
>> > > > d. send fetch
>> > > > e. via the console publisher, publish 2 messages
>> > > > f. send fetch :corr-id 1
>> > > > g. read 2 messages published :offsets [10 11] :corr-id 1
>> > > > h. send fetch :corr-id 2
>> > > > i. read 2 messages published :offsets [10 11] :corr-id 2
>> > > > j.  send fetch ...
>> > > >
>> > > > The problem is I get the messages sent twice as a response to two
>> > > separate
>> > > > fetch requests. The correlation id is distinct so it cannot be that
>> I
>> > > read
>> > > > the response twice. The offsets of the 2 messages are are the same
>> so
>> > > they
>> > > > are duplicates, and its not the producer sending the messages twice.
>> > > >
>> > > > Note: the same connection is kept open the whole time, and I send
>> > > > block,receive then send again, after the first 2 messages are read,
>> the
>> > > > offsets are incremented and the next fetch will ask kafka to give it
>> > > > messages from the new offsets.
>> > > >
>> > > > any ideas of why kafka would be sending the messages again on the
>> > second
>> > > > fetch request?
>> > > >
>> > > > Regards,
>> > > >  Gerrit
>> > > >
>> > >
>> >
>>
>
>

Re: custom kafka consumer - strangeness

Posted by Gerrit Jansen van Vuuren <ge...@gmail.com>.
Hi,

the offset in g is 240, and in i 242, the last message read was at offset
239.

After reading from 0 - 239, I make another request for 240, this request
timesout and never returns.
I then manually add 2 entries via the console producer, all the time while
making a request for 240 every 10 seconds, all subsequent requests for
offset 240 returns empty messages, till the responses are written. Then I
get the 2 messages at offsets 240,241 and an end of response. Then I make a
request for offset 242, and get the messages at offsets 240,241 again.

I've attached a portion of the kafka-request.log set to trace.

The correlation ids are:
1389604489 - first request at offset 0
1389604511  - timeout at offset 240
1389604563  - got data request at offset 240
1389604573  - got duplicates request at offset 242

Regards,
 Gerrit




On Mon, Jan 13, 2014 at 5:10 AM, Jun Rao <ju...@gmail.com> wrote:

> What's the offset used in the fetch request in steps g and i that both
> returned offsets 10 and 11?
>
> Thanks,
>
> Jun
>
>
> On Sat, Jan 11, 2014 at 3:19 AM, Gerrit Jansen van Vuuren <
> gerritjvv@gmail.com> wrote:
>
> > Hi,
> >
> >
> > No the offsets are not the same. I've printed out the values to see this,
> > and its not the case.
> >
> >
> >
> > On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Are the offset used in the 2 fetch requests the same? If so, you will
> get
> > > the same messages twice. You consumer is responsible for advancing the
> > > offsets after consumption.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
> > > gerritjvv@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm writing a custom consumer for kafka 0.8.
> > > > Everything works except for the following:
> > > >
> > > > a. connect, send fetch, read all results
> > > > b. send fetch
> > > > c. send fetch
> > > > d. send fetch
> > > > e. via the console publisher, publish 2 messages
> > > > f. send fetch :corr-id 1
> > > > g. read 2 messages published :offsets [10 11] :corr-id 1
> > > > h. send fetch :corr-id 2
> > > > i. read 2 messages published :offsets [10 11] :corr-id 2
> > > > j.  send fetch ...
> > > >
> > > > The problem is I get the messages sent twice as a response to two
> > > separate
> > > > fetch requests. The correlation id is distinct so it cannot be that I
> > > read
> > > > the response twice. The offsets of the 2 messages are are the same so
> > > they
> > > > are duplicates, and its not the producer sending the messages twice.
> > > >
> > > > Note: the same connection is kept open the whole time, and I send
> > > > block,receive then send again, after the first 2 messages are read,
> the
> > > > offsets are incremented and the next fetch will ask kafka to give it
> > > > messages from the new offsets.
> > > >
> > > > any ideas of why kafka would be sending the messages again on the
> > second
> > > > fetch request?
> > > >
> > > > Regards,
> > > >  Gerrit
> > > >
> > >
> >
>

Re: custom kafka consumer - strangeness

Posted by Jun Rao <ju...@gmail.com>.
What's the offset used in the fetch request in steps g and i that both
returned offsets 10 and 11?

Thanks,

Jun


On Sat, Jan 11, 2014 at 3:19 AM, Gerrit Jansen van Vuuren <
gerritjvv@gmail.com> wrote:

> Hi,
>
>
> No the offsets are not the same. I've printed out the values to see this,
> and its not the case.
>
>
>
> On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Are the offset used in the 2 fetch requests the same? If so, you will get
> > the same messages twice. You consumer is responsible for advancing the
> > offsets after consumption.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
> > gerritjvv@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I'm writing a custom consumer for kafka 0.8.
> > > Everything works except for the following:
> > >
> > > a. connect, send fetch, read all results
> > > b. send fetch
> > > c. send fetch
> > > d. send fetch
> > > e. via the console publisher, publish 2 messages
> > > f. send fetch :corr-id 1
> > > g. read 2 messages published :offsets [10 11] :corr-id 1
> > > h. send fetch :corr-id 2
> > > i. read 2 messages published :offsets [10 11] :corr-id 2
> > > j.  send fetch ...
> > >
> > > The problem is I get the messages sent twice as a response to two
> > separate
> > > fetch requests. The correlation id is distinct so it cannot be that I
> > read
> > > the response twice. The offsets of the 2 messages are are the same so
> > they
> > > are duplicates, and its not the producer sending the messages twice.
> > >
> > > Note: the same connection is kept open the whole time, and I send
> > > block,receive then send again, after the first 2 messages are read, the
> > > offsets are incremented and the next fetch will ask kafka to give it
> > > messages from the new offsets.
> > >
> > > any ideas of why kafka would be sending the messages again on the
> second
> > > fetch request?
> > >
> > > Regards,
> > >  Gerrit
> > >
> >
>

Re: custom kafka consumer - strangeness

Posted by Gerrit Jansen van Vuuren <ge...@gmail.com>.
Hi,


No the offsets are not the same. I've printed out the values to see this,
and its not the case.



On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <ju...@gmail.com> wrote:

> Are the offset used in the 2 fetch requests the same? If so, you will get
> the same messages twice. You consumer is responsible for advancing the
> offsets after consumption.
>
> Thanks,
>
> Jun
>
>
> On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
> gerritjvv@gmail.com> wrote:
>
> > Hi,
> >
> > I'm writing a custom consumer for kafka 0.8.
> > Everything works except for the following:
> >
> > a. connect, send fetch, read all results
> > b. send fetch
> > c. send fetch
> > d. send fetch
> > e. via the console publisher, publish 2 messages
> > f. send fetch :corr-id 1
> > g. read 2 messages published :offsets [10 11] :corr-id 1
> > h. send fetch :corr-id 2
> > i. read 2 messages published :offsets [10 11] :corr-id 2
> > j.  send fetch ...
> >
> > The problem is I get the messages sent twice as a response to two
> separate
> > fetch requests. The correlation id is distinct so it cannot be that I
> read
> > the response twice. The offsets of the 2 messages are are the same so
> they
> > are duplicates, and its not the producer sending the messages twice.
> >
> > Note: the same connection is kept open the whole time, and I send
> > block,receive then send again, after the first 2 messages are read, the
> > offsets are incremented and the next fetch will ask kafka to give it
> > messages from the new offsets.
> >
> > any ideas of why kafka would be sending the messages again on the second
> > fetch request?
> >
> > Regards,
> >  Gerrit
> >
>

Re: custom kafka consumer - strangeness

Posted by Jun Rao <ju...@gmail.com>.
Are the offset used in the 2 fetch requests the same? If so, you will get
the same messages twice. You consumer is responsible for advancing the
offsets after consumption.

Thanks,

Jun


On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
gerritjvv@gmail.com> wrote:

> Hi,
>
> I'm writing a custom consumer for kafka 0.8.
> Everything works except for the following:
>
> a. connect, send fetch, read all results
> b. send fetch
> c. send fetch
> d. send fetch
> e. via the console publisher, publish 2 messages
> f. send fetch :corr-id 1
> g. read 2 messages published :offsets [10 11] :corr-id 1
> h. send fetch :corr-id 2
> i. read 2 messages published :offsets [10 11] :corr-id 2
> j.  send fetch ...
>
> The problem is I get the messages sent twice as a response to two separate
> fetch requests. The correlation id is distinct so it cannot be that I read
> the response twice. The offsets of the 2 messages are are the same so they
> are duplicates, and its not the producer sending the messages twice.
>
> Note: the same connection is kept open the whole time, and I send
> block,receive then send again, after the first 2 messages are read, the
> offsets are incremented and the next fetch will ask kafka to give it
> messages from the new offsets.
>
> any ideas of why kafka would be sending the messages again on the second
> fetch request?
>
> Regards,
>  Gerrit
>