You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jack <ja...@gmail.com> on 2015/04/07 00:15:21 UTC

Kafka question

Hi folks,

I have a quick question.

We are using 0.8.1 and running into this weird problem. We are using
HighLevelConsumer for this topic. We created 64 partitions for this
message.

In our service, we first create a Consumer object as usual, and then we
went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It
returns us a Se[KafkaStream], For each stream object in the sequence, we
submit a task like the following to the pool.

threadpool.submit(new Runnable {
   override def run() = {
      stream.iterator().foreach { msg => ...}
  }
}

The problem we ran into is that after all the above established, any
message showing up in kafka, we should be able to get it from consumer
side. But in reality, for some reason, occasionally, we don't see these
message (we do see these message in the log though).

 Some team members believe that the stream might get a later offset, thus
not being able to see the earlier messages.

I really doubt that statement and want to see if anyone could shed any
light upon this?

One possible theory from me is that the offset won't be given until
stream.iterator().next is called, but since the task submission is
asynchronous (we don't wait for each submission and then produce message to
kafka), that could get us a later offset, which might not contains the
message we want). One possible solution to that is perform any action which
produce messages to kafka, after all these submitted tasks returns.

Any thoughts?

Thanks,

-Jack

Re: Kafka question

Posted by Jack <ja...@gmail.com>.
That would be really useful. Thanks for your writing, Guozhang. I will give
it a shot and let you know.

On Tue, Apr 7, 2015 at 10:06 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Jack,
>
> Okay I see your point now. I was originally thinking that in each run, you
> 1) first create the topic, 2) start producing to the topic, 3) start
> consuming from the topic, and then 4) delete the topic, stop producers /
> consumers before complete, but it sounds like you actually only create the
> topic once.
>
> If that is the case and you always use a different group id, then yes with
> the current consumer you have to make sure that at the boundary of each
> run, when you stop the consumers you also have to halt the producers from
> continue producing until the starting of the next run. In the new consumer
> that we are currently developing, it allows you to specify the starting
> offset for your consumption and you could then do some offset check
> pointing outside Kafka on the consumer side and use the check pointed
> offsets when you resume in each run.
>
> You can find the new consumer's API here (check position() / seek()
> specifically) and let me know if you think that will work for your case.
>
>
> http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>
> Guozhang
>
> On Mon, Apr 6, 2015 at 8:39 PM, Jack <ja...@gmail.com> wrote:
>
> > How about the first run then? If we use "largest" as "auto.offset.reset"
> > value, what value will these consumers get? I assume it will point to the
> > latest position in the log. Is that true? Just you know, we can't have a
> > warm up run so that the later runs can use the committed offset by that
> > run.
> >
> > To give you a little bit more context, for every run, we create a unique
> > group.id so essentially, we want the offset to point to a safe position
> so
> > that consumer won't miss any messages appended after that point. So is
> > there a way other than setting "auto.offset.reset" to "smallest" which we
> > know it works, but it took forever to get the data (since the log is
> long).
> >
> > Thanks again.
> >
> > -Jack
> >
> > On Mon, Apr 6, 2015 at 5:34 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Did you turn on automatic offset committing? If yes then this issue
> > should
> > > not happen as later runs will just consume data from the last committed
> > > offset.
> > >
> > > Guozhang
> > >
> > > On Mon, Apr 6, 2015 at 5:16 PM, Jack <ja...@gmail.com> wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > When I switched to auto.offset.reset to smallest, it will work.
> > However,
> > > it
> > > > will generate a lot of data and it will slow down the verification.
> > > >
> > > > Thanks,
> > > >
> > > > -Jack
> > > >
> > > > On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Jack,
> > > > >
> > > > > Could you just change "auto.offset.reset" to smallest and see if
> this
> > > > issue
> > > > > goes away? It is not related to the producer end.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Apr 6, 2015 at 4:14 PM, Jack <ja...@gmail.com> wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > Thanks so much for replying, first of all.
> > > > > >
> > > > > > Here is the config we have:
> > > > > >
> > > > > > group.id -> 'some unique id'
> > > > > > zookeeper.connect -> 'zookeeper host'
> > > > > > auto.commit.enabled -> false
> > > > > > 'auto.offset.reset' -> largest
> > > > > > consumer.timeout.ms -> -1
> > > > > > fetch.message.max.bytes -> 10M
> > > > > >
> > > > > > So it seems like we need to make sure the submitted future
> returns
> > > > before
> > > > > > performing action actions which eventually generate the message
> we
> > > > > expect.
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > -Jack
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Jack,
> > > > > > >
> > > > > > > Your theory is correct if your consumer config set
> > > auto.offset.reset
> > > > to
> > > > > > > latest and you do not have any committed offsets before. Could
> > you
> > > > list
> > > > > > > your consumer configs and see if that is the case?
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <ja...@gmail.com> wrote:
> > > > > > >
> > > > > > > > Hi folks,
> > > > > > > >
> > > > > > > > I have a quick question.
> > > > > > > >
> > > > > > > > We are using 0.8.1 and running into this weird problem. We
> are
> > > > using
> > > > > > > > HighLevelConsumer for this topic. We created 64 partitions
> for
> > > this
> > > > > > > > message.
> > > > > > > >
> > > > > > > > In our service, we first create a Consumer object as usual,
> and
> > > > then
> > > > > we
> > > > > > > > went ahead, calls 'createMessageStreans' with
> > > > Map('topic_name'->64).
> > > > > It
> > > > > > > > returns us a Se[KafkaStream], For each stream object in the
> > > > sequence,
> > > > > > we
> > > > > > > > submit a task like the following to the pool.
> > > > > > > >
> > > > > > > > threadpool.submit(new Runnable {
> > > > > > > >    override def run() = {
> > > > > > > >       stream.iterator().foreach { msg => ...}
> > > > > > > >   }
> > > > > > > > }
> > > > > > > >
> > > > > > > > The problem we ran into is that after all the above
> > established,
> > > > any
> > > > > > > > message showing up in kafka, we should be able to get it from
> > > > > consumer
> > > > > > > > side. But in reality, for some reason, occasionally, we don't
> > see
> > > > > these
> > > > > > > > message (we do see these message in the log though).
> > > > > > > >
> > > > > > > >  Some team members believe that the stream might get a later
> > > > offset,
> > > > > > thus
> > > > > > > > not being able to see the earlier messages.
> > > > > > > >
> > > > > > > > I really doubt that statement and want to see if anyone could
> > > shed
> > > > > any
> > > > > > > > light upon this?
> > > > > > > >
> > > > > > > > One possible theory from me is that the offset won't be given
> > > until
> > > > > > > > stream.iterator().next is called, but since the task
> submission
> > > is
> > > > > > > > asynchronous (we don't wait for each submission and then
> > produce
> > > > > > message
> > > > > > > to
> > > > > > > > kafka), that could get us a later offset, which might not
> > > contains
> > > > > the
> > > > > > > > message we want). One possible solution to that is perform
> any
> > > > action
> > > > > > > which
> > > > > > > > produce messages to kafka, after all these submitted tasks
> > > returns.
> > > > > > > >
> > > > > > > > Any thoughts?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > -Jack
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka question

Posted by Guozhang Wang <wa...@gmail.com>.
Jack,

Okay I see your point now. I was originally thinking that in each run, you
1) first create the topic, 2) start producing to the topic, 3) start
consuming from the topic, and then 4) delete the topic, stop producers /
consumers before complete, but it sounds like you actually only create the
topic once.

If that is the case and you always use a different group id, then yes with
the current consumer you have to make sure that at the boundary of each
run, when you stop the consumers you also have to halt the producers from
continue producing until the starting of the next run. In the new consumer
that we are currently developing, it allows you to specify the starting
offset for your consumption and you could then do some offset check
pointing outside Kafka on the consumer side and use the check pointed
offsets when you resume in each run.

You can find the new consumer's API here (check position() / seek()
specifically) and let me know if you think that will work for your case.

http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Guozhang

On Mon, Apr 6, 2015 at 8:39 PM, Jack <ja...@gmail.com> wrote:

> How about the first run then? If we use "largest" as "auto.offset.reset"
> value, what value will these consumers get? I assume it will point to the
> latest position in the log. Is that true? Just you know, we can't have a
> warm up run so that the later runs can use the committed offset by that
> run.
>
> To give you a little bit more context, for every run, we create a unique
> group.id so essentially, we want the offset to point to a safe position so
> that consumer won't miss any messages appended after that point. So is
> there a way other than setting "auto.offset.reset" to "smallest" which we
> know it works, but it took forever to get the data (since the log is long).
>
> Thanks again.
>
> -Jack
>
> On Mon, Apr 6, 2015 at 5:34 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Did you turn on automatic offset committing? If yes then this issue
> should
> > not happen as later runs will just consume data from the last committed
> > offset.
> >
> > Guozhang
> >
> > On Mon, Apr 6, 2015 at 5:16 PM, Jack <ja...@gmail.com> wrote:
> >
> > > Hi Guozhang,
> > >
> > > When I switched to auto.offset.reset to smallest, it will work.
> However,
> > it
> > > will generate a lot of data and it will slow down the verification.
> > >
> > > Thanks,
> > >
> > > -Jack
> > >
> > > On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Jack,
> > > >
> > > > Could you just change "auto.offset.reset" to smallest and see if this
> > > issue
> > > > goes away? It is not related to the producer end.
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Apr 6, 2015 at 4:14 PM, Jack <ja...@gmail.com> wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > Thanks so much for replying, first of all.
> > > > >
> > > > > Here is the config we have:
> > > > >
> > > > > group.id -> 'some unique id'
> > > > > zookeeper.connect -> 'zookeeper host'
> > > > > auto.commit.enabled -> false
> > > > > 'auto.offset.reset' -> largest
> > > > > consumer.timeout.ms -> -1
> > > > > fetch.message.max.bytes -> 10M
> > > > >
> > > > > So it seems like we need to make sure the submitted future returns
> > > before
> > > > > performing action actions which eventually generate the message we
> > > > expect.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > -Jack
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Jack,
> > > > > >
> > > > > > Your theory is correct if your consumer config set
> > auto.offset.reset
> > > to
> > > > > > latest and you do not have any committed offsets before. Could
> you
> > > list
> > > > > > your consumer configs and see if that is the case?
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <ja...@gmail.com> wrote:
> > > > > >
> > > > > > > Hi folks,
> > > > > > >
> > > > > > > I have a quick question.
> > > > > > >
> > > > > > > We are using 0.8.1 and running into this weird problem. We are
> > > using
> > > > > > > HighLevelConsumer for this topic. We created 64 partitions for
> > this
> > > > > > > message.
> > > > > > >
> > > > > > > In our service, we first create a Consumer object as usual, and
> > > then
> > > > we
> > > > > > > went ahead, calls 'createMessageStreans' with
> > > Map('topic_name'->64).
> > > > It
> > > > > > > returns us a Se[KafkaStream], For each stream object in the
> > > sequence,
> > > > > we
> > > > > > > submit a task like the following to the pool.
> > > > > > >
> > > > > > > threadpool.submit(new Runnable {
> > > > > > >    override def run() = {
> > > > > > >       stream.iterator().foreach { msg => ...}
> > > > > > >   }
> > > > > > > }
> > > > > > >
> > > > > > > The problem we ran into is that after all the above
> established,
> > > any
> > > > > > > message showing up in kafka, we should be able to get it from
> > > > consumer
> > > > > > > side. But in reality, for some reason, occasionally, we don't
> see
> > > > these
> > > > > > > message (we do see these message in the log though).
> > > > > > >
> > > > > > >  Some team members believe that the stream might get a later
> > > offset,
> > > > > thus
> > > > > > > not being able to see the earlier messages.
> > > > > > >
> > > > > > > I really doubt that statement and want to see if anyone could
> > shed
> > > > any
> > > > > > > light upon this?
> > > > > > >
> > > > > > > One possible theory from me is that the offset won't be given
> > until
> > > > > > > stream.iterator().next is called, but since the task submission
> > is
> > > > > > > asynchronous (we don't wait for each submission and then
> produce
> > > > > message
> > > > > > to
> > > > > > > kafka), that could get us a later offset, which might not
> > contains
> > > > the
> > > > > > > message we want). One possible solution to that is perform any
> > > action
> > > > > > which
> > > > > > > produce messages to kafka, after all these submitted tasks
> > returns.
> > > > > > >
> > > > > > > Any thoughts?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > -Jack
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka question

Posted by Jack <ja...@gmail.com>.
How about the first run then? If we use "largest" as "auto.offset.reset"
value, what value will these consumers get? I assume it will point to the
latest position in the log. Is that true? Just you know, we can't have a
warm up run so that the later runs can use the committed offset by that run.

To give you a little bit more context, for every run, we create a unique
group.id so essentially, we want the offset to point to a safe position so
that consumer won't miss any messages appended after that point. So is
there a way other than setting "auto.offset.reset" to "smallest" which we
know it works, but it took forever to get the data (since the log is long).

Thanks again.

-Jack

On Mon, Apr 6, 2015 at 5:34 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Did you turn on automatic offset committing? If yes then this issue should
> not happen as later runs will just consume data from the last committed
> offset.
>
> Guozhang
>
> On Mon, Apr 6, 2015 at 5:16 PM, Jack <ja...@gmail.com> wrote:
>
> > Hi Guozhang,
> >
> > When I switched to auto.offset.reset to smallest, it will work. However,
> it
> > will generate a lot of data and it will slow down the verification.
> >
> > Thanks,
> >
> > -Jack
> >
> > On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Jack,
> > >
> > > Could you just change "auto.offset.reset" to smallest and see if this
> > issue
> > > goes away? It is not related to the producer end.
> > >
> > > Guozhang
> > >
> > > On Mon, Apr 6, 2015 at 4:14 PM, Jack <ja...@gmail.com> wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Thanks so much for replying, first of all.
> > > >
> > > > Here is the config we have:
> > > >
> > > > group.id -> 'some unique id'
> > > > zookeeper.connect -> 'zookeeper host'
> > > > auto.commit.enabled -> false
> > > > 'auto.offset.reset' -> largest
> > > > consumer.timeout.ms -> -1
> > > > fetch.message.max.bytes -> 10M
> > > >
> > > > So it seems like we need to make sure the submitted future returns
> > before
> > > > performing action actions which eventually generate the message we
> > > expect.
> > > >
> > > > Cheers,
> > > >
> > > > -Jack
> > > >
> > > >
> > > >
> > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Jack,
> > > > >
> > > > > Your theory is correct if your consumer config set
> auto.offset.reset
> > to
> > > > > latest and you do not have any committed offsets before. Could you
> > list
> > > > > your consumer configs and see if that is the case?
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <ja...@gmail.com> wrote:
> > > > >
> > > > > > Hi folks,
> > > > > >
> > > > > > I have a quick question.
> > > > > >
> > > > > > We are using 0.8.1 and running into this weird problem. We are
> > using
> > > > > > HighLevelConsumer for this topic. We created 64 partitions for
> this
> > > > > > message.
> > > > > >
> > > > > > In our service, we first create a Consumer object as usual, and
> > then
> > > we
> > > > > > went ahead, calls 'createMessageStreans' with
> > Map('topic_name'->64).
> > > It
> > > > > > returns us a Se[KafkaStream], For each stream object in the
> > sequence,
> > > > we
> > > > > > submit a task like the following to the pool.
> > > > > >
> > > > > > threadpool.submit(new Runnable {
> > > > > >    override def run() = {
> > > > > >       stream.iterator().foreach { msg => ...}
> > > > > >   }
> > > > > > }
> > > > > >
> > > > > > The problem we ran into is that after all the above established,
> > any
> > > > > > message showing up in kafka, we should be able to get it from
> > > consumer
> > > > > > side. But in reality, for some reason, occasionally, we don't see
> > > these
> > > > > > message (we do see these message in the log though).
> > > > > >
> > > > > >  Some team members believe that the stream might get a later
> > offset,
> > > > thus
> > > > > > not being able to see the earlier messages.
> > > > > >
> > > > > > I really doubt that statement and want to see if anyone could
> shed
> > > any
> > > > > > light upon this?
> > > > > >
> > > > > > One possible theory from me is that the offset won't be given
> until
> > > > > > stream.iterator().next is called, but since the task submission
> is
> > > > > > asynchronous (we don't wait for each submission and then produce
> > > > message
> > > > > to
> > > > > > kafka), that could get us a later offset, which might not
> contains
> > > the
> > > > > > message we want). One possible solution to that is perform any
> > action
> > > > > which
> > > > > > produce messages to kafka, after all these submitted tasks
> returns.
> > > > > >
> > > > > > Any thoughts?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > -Jack
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka question

Posted by Guozhang Wang <wa...@gmail.com>.
Did you turn on automatic offset committing? If yes then this issue should
not happen as later runs will just consume data from the last committed
offset.

Guozhang

On Mon, Apr 6, 2015 at 5:16 PM, Jack <ja...@gmail.com> wrote:

> Hi Guozhang,
>
> When I switched to auto.offset.reset to smallest, it will work. However, it
> will generate a lot of data and it will slow down the verification.
>
> Thanks,
>
> -Jack
>
> On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Jack,
> >
> > Could you just change "auto.offset.reset" to smallest and see if this
> issue
> > goes away? It is not related to the producer end.
> >
> > Guozhang
> >
> > On Mon, Apr 6, 2015 at 4:14 PM, Jack <ja...@gmail.com> wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thanks so much for replying, first of all.
> > >
> > > Here is the config we have:
> > >
> > > group.id -> 'some unique id'
> > > zookeeper.connect -> 'zookeeper host'
> > > auto.commit.enabled -> false
> > > 'auto.offset.reset' -> largest
> > > consumer.timeout.ms -> -1
> > > fetch.message.max.bytes -> 10M
> > >
> > > So it seems like we need to make sure the submitted future returns
> before
> > > performing action actions which eventually generate the message we
> > expect.
> > >
> > > Cheers,
> > >
> > > -Jack
> > >
> > >
> > >
> > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Jack,
> > > >
> > > > Your theory is correct if your consumer config set auto.offset.reset
> to
> > > > latest and you do not have any committed offsets before. Could you
> list
> > > > your consumer configs and see if that is the case?
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <ja...@gmail.com> wrote:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > I have a quick question.
> > > > >
> > > > > We are using 0.8.1 and running into this weird problem. We are
> using
> > > > > HighLevelConsumer for this topic. We created 64 partitions for this
> > > > > message.
> > > > >
> > > > > In our service, we first create a Consumer object as usual, and
> then
> > we
> > > > > went ahead, calls 'createMessageStreans' with
> Map('topic_name'->64).
> > It
> > > > > returns us a Se[KafkaStream], For each stream object in the
> sequence,
> > > we
> > > > > submit a task like the following to the pool.
> > > > >
> > > > > threadpool.submit(new Runnable {
> > > > >    override def run() = {
> > > > >       stream.iterator().foreach { msg => ...}
> > > > >   }
> > > > > }
> > > > >
> > > > > The problem we ran into is that after all the above established,
> any
> > > > > message showing up in kafka, we should be able to get it from
> > consumer
> > > > > side. But in reality, for some reason, occasionally, we don't see
> > these
> > > > > message (we do see these message in the log though).
> > > > >
> > > > >  Some team members believe that the stream might get a later
> offset,
> > > thus
> > > > > not being able to see the earlier messages.
> > > > >
> > > > > I really doubt that statement and want to see if anyone could shed
> > any
> > > > > light upon this?
> > > > >
> > > > > One possible theory from me is that the offset won't be given until
> > > > > stream.iterator().next is called, but since the task submission is
> > > > > asynchronous (we don't wait for each submission and then produce
> > > message
> > > > to
> > > > > kafka), that could get us a later offset, which might not contains
> > the
> > > > > message we want). One possible solution to that is perform any
> action
> > > > which
> > > > > produce messages to kafka, after all these submitted tasks returns.
> > > > >
> > > > > Any thoughts?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > -Jack
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka question

Posted by Jack <ja...@gmail.com>.
Hi Guozhang,

When I switched to auto.offset.reset to smallest, it will work. However, it
will generate a lot of data and it will slow down the verification.

Thanks,

-Jack

On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Jack,
>
> Could you just change "auto.offset.reset" to smallest and see if this issue
> goes away? It is not related to the producer end.
>
> Guozhang
>
> On Mon, Apr 6, 2015 at 4:14 PM, Jack <ja...@gmail.com> wrote:
>
> > Hi Guozhang,
> >
> > Thanks so much for replying, first of all.
> >
> > Here is the config we have:
> >
> > group.id -> 'some unique id'
> > zookeeper.connect -> 'zookeeper host'
> > auto.commit.enabled -> false
> > 'auto.offset.reset' -> largest
> > consumer.timeout.ms -> -1
> > fetch.message.max.bytes -> 10M
> >
> > So it seems like we need to make sure the submitted future returns before
> > performing action actions which eventually generate the message we
> expect.
> >
> > Cheers,
> >
> > -Jack
> >
> >
> >
> > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Jack,
> > >
> > > Your theory is correct if your consumer config set auto.offset.reset to
> > > latest and you do not have any committed offsets before. Could you list
> > > your consumer configs and see if that is the case?
> > >
> > > Guozhang
> > >
> > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <ja...@gmail.com> wrote:
> > >
> > > > Hi folks,
> > > >
> > > > I have a quick question.
> > > >
> > > > We are using 0.8.1 and running into this weird problem. We are using
> > > > HighLevelConsumer for this topic. We created 64 partitions for this
> > > > message.
> > > >
> > > > In our service, we first create a Consumer object as usual, and then
> we
> > > > went ahead, calls 'createMessageStreans' with Map('topic_name'->64).
> It
> > > > returns us a Se[KafkaStream], For each stream object in the sequence,
> > we
> > > > submit a task like the following to the pool.
> > > >
> > > > threadpool.submit(new Runnable {
> > > >    override def run() = {
> > > >       stream.iterator().foreach { msg => ...}
> > > >   }
> > > > }
> > > >
> > > > The problem we ran into is that after all the above established, any
> > > > message showing up in kafka, we should be able to get it from
> consumer
> > > > side. But in reality, for some reason, occasionally, we don't see
> these
> > > > message (we do see these message in the log though).
> > > >
> > > >  Some team members believe that the stream might get a later offset,
> > thus
> > > > not being able to see the earlier messages.
> > > >
> > > > I really doubt that statement and want to see if anyone could shed
> any
> > > > light upon this?
> > > >
> > > > One possible theory from me is that the offset won't be given until
> > > > stream.iterator().next is called, but since the task submission is
> > > > asynchronous (we don't wait for each submission and then produce
> > message
> > > to
> > > > kafka), that could get us a later offset, which might not contains
> the
> > > > message we want). One possible solution to that is perform any action
> > > which
> > > > produce messages to kafka, after all these submitted tasks returns.
> > > >
> > > > Any thoughts?
> > > >
> > > > Thanks,
> > > >
> > > > -Jack
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka question

Posted by Guozhang Wang <wa...@gmail.com>.
Jack,

Could you just change "auto.offset.reset" to smallest and see if this issue
goes away? It is not related to the producer end.

Guozhang

On Mon, Apr 6, 2015 at 4:14 PM, Jack <ja...@gmail.com> wrote:

> Hi Guozhang,
>
> Thanks so much for replying, first of all.
>
> Here is the config we have:
>
> group.id -> 'some unique id'
> zookeeper.connect -> 'zookeeper host'
> auto.commit.enabled -> false
> 'auto.offset.reset' -> largest
> consumer.timeout.ms -> -1
> fetch.message.max.bytes -> 10M
>
> So it seems like we need to make sure the submitted future returns before
> performing action actions which eventually generate the message we expect.
>
> Cheers,
>
> -Jack
>
>
>
> On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Jack,
> >
> > Your theory is correct if your consumer config set auto.offset.reset to
> > latest and you do not have any committed offsets before. Could you list
> > your consumer configs and see if that is the case?
> >
> > Guozhang
> >
> > On Mon, Apr 6, 2015 at 3:15 PM, Jack <ja...@gmail.com> wrote:
> >
> > > Hi folks,
> > >
> > > I have a quick question.
> > >
> > > We are using 0.8.1 and running into this weird problem. We are using
> > > HighLevelConsumer for this topic. We created 64 partitions for this
> > > message.
> > >
> > > In our service, we first create a Consumer object as usual, and then we
> > > went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It
> > > returns us a Se[KafkaStream], For each stream object in the sequence,
> we
> > > submit a task like the following to the pool.
> > >
> > > threadpool.submit(new Runnable {
> > >    override def run() = {
> > >       stream.iterator().foreach { msg => ...}
> > >   }
> > > }
> > >
> > > The problem we ran into is that after all the above established, any
> > > message showing up in kafka, we should be able to get it from consumer
> > > side. But in reality, for some reason, occasionally, we don't see these
> > > message (we do see these message in the log though).
> > >
> > >  Some team members believe that the stream might get a later offset,
> thus
> > > not being able to see the earlier messages.
> > >
> > > I really doubt that statement and want to see if anyone could shed any
> > > light upon this?
> > >
> > > One possible theory from me is that the offset won't be given until
> > > stream.iterator().next is called, but since the task submission is
> > > asynchronous (we don't wait for each submission and then produce
> message
> > to
> > > kafka), that could get us a later offset, which might not contains the
> > > message we want). One possible solution to that is perform any action
> > which
> > > produce messages to kafka, after all these submitted tasks returns.
> > >
> > > Any thoughts?
> > >
> > > Thanks,
> > >
> > > -Jack
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Kafka question

Posted by Jack <ja...@gmail.com>.
Hi Guozhang,

Thanks so much for replying, first of all.

Here is the config we have:

group.id -> 'some unique id'
zookeeper.connect -> 'zookeeper host'
auto.commit.enabled -> false
'auto.offset.reset' -> largest
consumer.timeout.ms -> -1
fetch.message.max.bytes -> 10M

So it seems like we need to make sure the submitted future returns before
performing action actions which eventually generate the message we expect.

Cheers,

-Jack



On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Jack,
>
> Your theory is correct if your consumer config set auto.offset.reset to
> latest and you do not have any committed offsets before. Could you list
> your consumer configs and see if that is the case?
>
> Guozhang
>
> On Mon, Apr 6, 2015 at 3:15 PM, Jack <ja...@gmail.com> wrote:
>
> > Hi folks,
> >
> > I have a quick question.
> >
> > We are using 0.8.1 and running into this weird problem. We are using
> > HighLevelConsumer for this topic. We created 64 partitions for this
> > message.
> >
> > In our service, we first create a Consumer object as usual, and then we
> > went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It
> > returns us a Se[KafkaStream], For each stream object in the sequence, we
> > submit a task like the following to the pool.
> >
> > threadpool.submit(new Runnable {
> >    override def run() = {
> >       stream.iterator().foreach { msg => ...}
> >   }
> > }
> >
> > The problem we ran into is that after all the above established, any
> > message showing up in kafka, we should be able to get it from consumer
> > side. But in reality, for some reason, occasionally, we don't see these
> > message (we do see these message in the log though).
> >
> >  Some team members believe that the stream might get a later offset, thus
> > not being able to see the earlier messages.
> >
> > I really doubt that statement and want to see if anyone could shed any
> > light upon this?
> >
> > One possible theory from me is that the offset won't be given until
> > stream.iterator().next is called, but since the task submission is
> > asynchronous (we don't wait for each submission and then produce message
> to
> > kafka), that could get us a later offset, which might not contains the
> > message we want). One possible solution to that is perform any action
> which
> > produce messages to kafka, after all these submitted tasks returns.
> >
> > Any thoughts?
> >
> > Thanks,
> >
> > -Jack
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka question

Posted by Guozhang Wang <wa...@gmail.com>.
Jack,

Your theory is correct if your consumer config set auto.offset.reset to
latest and you do not have any committed offsets before. Could you list
your consumer configs and see if that is the case?

Guozhang

On Mon, Apr 6, 2015 at 3:15 PM, Jack <ja...@gmail.com> wrote:

> Hi folks,
>
> I have a quick question.
>
> We are using 0.8.1 and running into this weird problem. We are using
> HighLevelConsumer for this topic. We created 64 partitions for this
> message.
>
> In our service, we first create a Consumer object as usual, and then we
> went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It
> returns us a Se[KafkaStream], For each stream object in the sequence, we
> submit a task like the following to the pool.
>
> threadpool.submit(new Runnable {
>    override def run() = {
>       stream.iterator().foreach { msg => ...}
>   }
> }
>
> The problem we ran into is that after all the above established, any
> message showing up in kafka, we should be able to get it from consumer
> side. But in reality, for some reason, occasionally, we don't see these
> message (we do see these message in the log though).
>
>  Some team members believe that the stream might get a later offset, thus
> not being able to see the earlier messages.
>
> I really doubt that statement and want to see if anyone could shed any
> light upon this?
>
> One possible theory from me is that the offset won't be given until
> stream.iterator().next is called, but since the task submission is
> asynchronous (we don't wait for each submission and then produce message to
> kafka), that could get us a later offset, which might not contains the
> message we want). One possible solution to that is perform any action which
> produce messages to kafka, after all these submitted tasks returns.
>
> Any thoughts?
>
> Thanks,
>
> -Jack
>



-- 
-- Guozhang