You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Xiaoyu Wang <xw...@rocketfuel.com> on 2015/02/20 19:48:48 UTC

KafkaProducer from kafka.clients hangs when some partitions are not available

Hello,

I am experimenting sending data to kafka using KafkaProducer and found that
when a partition is completely offline, e.g. a topic with replication
factor = 1 and some broker is down, KafkaProducer seems to be hanging
forever. Not even exit with the timeout setting. Can you take a look?

I checked code and found that the partitioner create partition based on the
total partition number - including those offline partitions. Is it possible
that we change ProducerClient to ignore offline partitions?


Thanks,

-Xiaoyu

Re: KafkaProducer from kafka.clients hangs when some partitions are not available

Posted by Jun Rao <ju...@confluent.io>.
Xiaoyu,

For 1, I have a patch for 0.8.2 in
https://issues.apache.org/jira/browse/KAFKA-1984. Could you test it out and
see if it fixes your issue?

For 2, I did some local testing. The only issue I saw is that producer can
block on close since there are still unsent messages in the bufferpool.
This is a known issue and is being tracked in
https://issues.apache.org/jira/browse/KAFKA-1788. Could you confirm whether
your producer blocks during send or during close (you can figure it out by
taking a thread dump)?

Thanks,

Jun



On Tue, Feb 24, 2015 at 10:14 AM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:

> Jun,
>
> Can you also take a look at the second problem I am having?
>
> > > > I am trying to test how KafkaProducer behaves with topic replication
> > > factor
> > > > = 1
> > > >
> > > >    1. One broker is offline BEFORE KafkaProducer starts sending
> > messages.
> > > >    Because of  the bug I mentioned, KafkaProducer sends to the
> offline
> > > >    partition and hangs forever.
>
>
> *> > >    2. One broker goes offline WHILE KafkaProducer is sending
> messages. > > >    KafkaProducer seems to be hanging forever in this case.
> I am still > > > looking.*
>
> On Tue, Feb 24, 2015 at 12:03 PM, Jun Rao <ju...@confluent.io> wrote:
>
> > Ah, yes. You are right. That's a more obvious bug. Will fix that in
> > KAFKA-1984.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Feb 24, 2015 at 8:37 AM, Xiaoyu Wang <xw...@rocketfuel.com>
> wrote:
> >
> > > Hi Jun,
> > >
> > > If I understand it correctly. the highlighted line is for avoiding
> > > offline partitions, is it?
> > >
> > > for (int i = 0; i < numPartitions; i++) {
> > >     int partition = Utils.abs(counter.getAndIncrement()) %
> numPartitions;
> > >     if (partitions.get(partition).leader() != null) {
> > >         return partition; --> should be changed to return the actual
> > > partition number?
> > >     }
> > > }
> > >
> > >
> > > On Tue, Feb 24, 2015 at 11:30 AM, Jun Rao <ju...@confluent.io> wrote:
> > >
> > > > Hi, Xiaoyu,
> > > >
> > > > 1. Could you explain a bit more what the bug is? The code does try to
> > > avoid
> > > > picking an unavailable partition. There does seem to be an issue when
> > > there
> > > > are more than one thread producing data to the same producer
> instance.
> > > This
> > > > is being tracked in KAFKA-1984. How many producing threads do you
> have
> > in
> > > > your test?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Feb 24, 2015 at 7:56 AM, Xiaoyu Wang <xw...@rocketfuel.com>
> > > wrote:
> > > >
> > > > > Jun,
> > > > >
> > > > > I am trying to test how KafkaProducer behaves with topic
> replication
> > > > factor
> > > > > = 1
> > > > >
> > > > >    1. One broker is offline BEFORE KafkaProducer starts sending
> > > messages.
> > > > >    Because of  the bug I mentioned, KafkaProducer sends to the
> > offline
> > > > >    partition and hangs forever.
> > > > >    2. One broker goes offline WHILE KafkaProducer is sending
> > messages.
> > > > >    KafkaProducer seems to be hanging forever in this case. I am
> still
> > > > > looking.
> > > > >    Do you mind take a look?
> > > > >
> > > > > Thanks
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Feb 23, 2015 at 7:01 PM, Jun Rao <ju...@confluent.io> wrote:
> > > > >
> > > > > > The logic in that code is to cycle through all partitions and
> > return
> > > as
> > > > > > soon as we see a partition with the leader. I do see an issue
> that
> > if
> > > > > there
> > > > > > are multiple threads sending messages to the same producer
> > > > concurrently,
> > > > > we
> > > > > > may not cycle through all partitions and therefore we could
> return
> > an
> > > > > > unavailable partition even when available partitions are present.
> > > > > >
> > > > > > Do you see this issue with just a single thread producing
> messages?
> > > The
> > > > > > current logic seems to work correctly in that case.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Fri, Feb 20, 2015 at 12:45 PM, Xiaoyu Wang <
> > xwang@rocketfuel.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Found the problem - it is a bug with Partitions of kafka
> client.
> > > Can
> > > > > you
> > > > > > > guys confirm and patch in kafka clients?
> > > > > > >
> > > > > > > for (int i = 0; i < numPartitions; i++) {
> > > > > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > > > > numPartitions;
> > > > > > >     if (partitions.get(partition).leader() != null) {
> > > > > > >         return partitions.get(partition).partition();
> > > > > > >     }
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <
> > xwang@rocketfuel.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Update:
> > > > > > > >
> > > > > > > > I am using kafka.clients 0.8.2-beta.  Below are the test
> steps
> > > > > > > >
> > > > > > > >    1. setup local kafka clusters with 2 brokers, 0 and 1
> > > > > > > >    2. create topic X with replication fact 1 and 4 partitions
> > > > > > > >    3. verify that each broker has two partitions
> > > > > > > >    4. shutdown broker 1
> > > > > > > >    5. start a producer sending data to topic X using
> > > KafkaProducer
> > > > > with
> > > > > > > >    required ack = 1
> > > > > > > >    6. producer hangs and does not exit.
> > > > > > > >
> > > > > > > > Offline partitions were take care of when the partitions is
> > null
> > > > > (code
> > > > > > > > attached below). However, the timeout setting does not seem
> to
> > > > work.
> > > > > > Not
> > > > > > > > sure what caused KafkaProducer to hang.
> > > > > > > >
> > > > > > > > // choose the next available node in a round-robin fashion
> > > > > > > > for (int i = 0; i < numPartitions; i++) {
> > > > > > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > > > > > numPartitions;
> > > > > > > >     if (partitions.get(partition).leader() != null)
> > > > > > > >         return partition;
> > > > > > > > }
> > > > > > > > // no partitions are available, give a non-available
> partition
> > > > > > > > return Utils.abs(counter.getAndIncrement()) % numPartitions;
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <
> > > xwang@rocketfuel.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hello,
> > > > > > > >>
> > > > > > > >> I am experimenting sending data to kafka using KafkaProducer
> > and
> > > > > found
> > > > > > > >> that when a partition is completely offline, e.g. a topic
> with
> > > > > > > replication
> > > > > > > >> factor = 1 and some broker is down, KafkaProducer seems to
> be
> > > > > hanging
> > > > > > > >> forever. Not even exit with the timeout setting. Can you
> take
> > a
> > > > > look?
> > > > > > > >>
> > > > > > > >> I checked code and found that the partitioner create
> partition
> > > > based
> > > > > > on
> > > > > > > >> the total partition number - including those offline
> > partitions.
> > > > Is
> > > > > it
> > > > > > > >> possible that we change ProducerClient to ignore offline
> > > > partitions?
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >>
> > > > > > > >> -Xiaoyu
> > > > > > > >>
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaProducer from kafka.clients hangs when some partitions are not available

Posted by Xiaoyu Wang <xw...@rocketfuel.com>.
Jun,

Can you also take a look at the second problem I am having?

> > > I am trying to test how KafkaProducer behaves with topic replication
> > factor
> > > = 1
> > >
> > >    1. One broker is offline BEFORE KafkaProducer starts sending
> messages.
> > >    Because of  the bug I mentioned, KafkaProducer sends to the offline
> > >    partition and hangs forever.


*> > >    2. One broker goes offline WHILE KafkaProducer is sending
messages. > > >    KafkaProducer seems to be hanging forever in this case.
I am still > > > looking.*

On Tue, Feb 24, 2015 at 12:03 PM, Jun Rao <ju...@confluent.io> wrote:

> Ah, yes. You are right. That's a more obvious bug. Will fix that in
> KAFKA-1984.
>
> Thanks,
>
> Jun
>
> On Tue, Feb 24, 2015 at 8:37 AM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:
>
> > Hi Jun,
> >
> > If I understand it correctly. the highlighted line is for avoiding
> > offline partitions, is it?
> >
> > for (int i = 0; i < numPartitions; i++) {
> >     int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
> >     if (partitions.get(partition).leader() != null) {
> >         return partition; --> should be changed to return the actual
> > partition number?
> >     }
> > }
> >
> >
> > On Tue, Feb 24, 2015 at 11:30 AM, Jun Rao <ju...@confluent.io> wrote:
> >
> > > Hi, Xiaoyu,
> > >
> > > 1. Could you explain a bit more what the bug is? The code does try to
> > avoid
> > > picking an unavailable partition. There does seem to be an issue when
> > there
> > > are more than one thread producing data to the same producer instance.
> > This
> > > is being tracked in KAFKA-1984. How many producing threads do you have
> in
> > > your test?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Feb 24, 2015 at 7:56 AM, Xiaoyu Wang <xw...@rocketfuel.com>
> > wrote:
> > >
> > > > Jun,
> > > >
> > > > I am trying to test how KafkaProducer behaves with topic replication
> > > factor
> > > > = 1
> > > >
> > > >    1. One broker is offline BEFORE KafkaProducer starts sending
> > messages.
> > > >    Because of  the bug I mentioned, KafkaProducer sends to the
> offline
> > > >    partition and hangs forever.
> > > >    2. One broker goes offline WHILE KafkaProducer is sending
> messages.
> > > >    KafkaProducer seems to be hanging forever in this case. I am still
> > > > looking.
> > > >    Do you mind take a look?
> > > >
> > > > Thanks
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Feb 23, 2015 at 7:01 PM, Jun Rao <ju...@confluent.io> wrote:
> > > >
> > > > > The logic in that code is to cycle through all partitions and
> return
> > as
> > > > > soon as we see a partition with the leader. I do see an issue that
> if
> > > > there
> > > > > are multiple threads sending messages to the same producer
> > > concurrently,
> > > > we
> > > > > may not cycle through all partitions and therefore we could return
> an
> > > > > unavailable partition even when available partitions are present.
> > > > >
> > > > > Do you see this issue with just a single thread producing messages?
> > The
> > > > > current logic seems to work correctly in that case.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Fri, Feb 20, 2015 at 12:45 PM, Xiaoyu Wang <
> xwang@rocketfuel.com>
> > > > > wrote:
> > > > >
> > > > > > Found the problem - it is a bug with Partitions of kafka client.
> > Can
> > > > you
> > > > > > guys confirm and patch in kafka clients?
> > > > > >
> > > > > > for (int i = 0; i < numPartitions; i++) {
> > > > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > > > numPartitions;
> > > > > >     if (partitions.get(partition).leader() != null) {
> > > > > >         return partitions.get(partition).partition();
> > > > > >     }
> > > > > > }
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <
> xwang@rocketfuel.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Update:
> > > > > > >
> > > > > > > I am using kafka.clients 0.8.2-beta.  Below are the test steps
> > > > > > >
> > > > > > >    1. setup local kafka clusters with 2 brokers, 0 and 1
> > > > > > >    2. create topic X with replication fact 1 and 4 partitions
> > > > > > >    3. verify that each broker has two partitions
> > > > > > >    4. shutdown broker 1
> > > > > > >    5. start a producer sending data to topic X using
> > KafkaProducer
> > > > with
> > > > > > >    required ack = 1
> > > > > > >    6. producer hangs and does not exit.
> > > > > > >
> > > > > > > Offline partitions were take care of when the partitions is
> null
> > > > (code
> > > > > > > attached below). However, the timeout setting does not seem to
> > > work.
> > > > > Not
> > > > > > > sure what caused KafkaProducer to hang.
> > > > > > >
> > > > > > > // choose the next available node in a round-robin fashion
> > > > > > > for (int i = 0; i < numPartitions; i++) {
> > > > > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > > > > numPartitions;
> > > > > > >     if (partitions.get(partition).leader() != null)
> > > > > > >         return partition;
> > > > > > > }
> > > > > > > // no partitions are available, give a non-available partition
> > > > > > > return Utils.abs(counter.getAndIncrement()) % numPartitions;
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <
> > xwang@rocketfuel.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > >> Hello,
> > > > > > >>
> > > > > > >> I am experimenting sending data to kafka using KafkaProducer
> and
> > > > found
> > > > > > >> that when a partition is completely offline, e.g. a topic with
> > > > > > replication
> > > > > > >> factor = 1 and some broker is down, KafkaProducer seems to be
> > > > hanging
> > > > > > >> forever. Not even exit with the timeout setting. Can you take
> a
> > > > look?
> > > > > > >>
> > > > > > >> I checked code and found that the partitioner create partition
> > > based
> > > > > on
> > > > > > >> the total partition number - including those offline
> partitions.
> > > Is
> > > > it
> > > > > > >> possible that we change ProducerClient to ignore offline
> > > partitions?
> > > > > > >>
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >>
> > > > > > >> -Xiaoyu
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaProducer from kafka.clients hangs when some partitions are not available

Posted by Jun Rao <ju...@confluent.io>.
Ah, yes. You are right. That's a more obvious bug. Will fix that in
KAFKA-1984.

Thanks,

Jun

On Tue, Feb 24, 2015 at 8:37 AM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:

> Hi Jun,
>
> If I understand it correctly. the highlighted line is for avoiding
> offline partitions, is it?
>
> for (int i = 0; i < numPartitions; i++) {
>     int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
>     if (partitions.get(partition).leader() != null) {
>         return partition; --> should be changed to return the actual
> partition number?
>     }
> }
>
>
> On Tue, Feb 24, 2015 at 11:30 AM, Jun Rao <ju...@confluent.io> wrote:
>
> > Hi, Xiaoyu,
> >
> > 1. Could you explain a bit more what the bug is? The code does try to
> avoid
> > picking an unavailable partition. There does seem to be an issue when
> there
> > are more than one thread producing data to the same producer instance.
> This
> > is being tracked in KAFKA-1984. How many producing threads do you have in
> > your test?
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Feb 24, 2015 at 7:56 AM, Xiaoyu Wang <xw...@rocketfuel.com>
> wrote:
> >
> > > Jun,
> > >
> > > I am trying to test how KafkaProducer behaves with topic replication
> > factor
> > > = 1
> > >
> > >    1. One broker is offline BEFORE KafkaProducer starts sending
> messages.
> > >    Because of  the bug I mentioned, KafkaProducer sends to the offline
> > >    partition and hangs forever.
> > >    2. One broker goes offline WHILE KafkaProducer is sending messages.
> > >    KafkaProducer seems to be hanging forever in this case. I am still
> > > looking.
> > >    Do you mind take a look?
> > >
> > > Thanks
> > >
> > >
> > >
> > >
> > > On Mon, Feb 23, 2015 at 7:01 PM, Jun Rao <ju...@confluent.io> wrote:
> > >
> > > > The logic in that code is to cycle through all partitions and return
> as
> > > > soon as we see a partition with the leader. I do see an issue that if
> > > there
> > > > are multiple threads sending messages to the same producer
> > concurrently,
> > > we
> > > > may not cycle through all partitions and therefore we could return an
> > > > unavailable partition even when available partitions are present.
> > > >
> > > > Do you see this issue with just a single thread producing messages?
> The
> > > > current logic seems to work correctly in that case.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Fri, Feb 20, 2015 at 12:45 PM, Xiaoyu Wang <xw...@rocketfuel.com>
> > > > wrote:
> > > >
> > > > > Found the problem - it is a bug with Partitions of kafka client.
> Can
> > > you
> > > > > guys confirm and patch in kafka clients?
> > > > >
> > > > > for (int i = 0; i < numPartitions; i++) {
> > > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > > numPartitions;
> > > > >     if (partitions.get(partition).leader() != null) {
> > > > >         return partitions.get(partition).partition();
> > > > >     }
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <xwang@rocketfuel.com
> >
> > > > wrote:
> > > > >
> > > > > > Update:
> > > > > >
> > > > > > I am using kafka.clients 0.8.2-beta.  Below are the test steps
> > > > > >
> > > > > >    1. setup local kafka clusters with 2 brokers, 0 and 1
> > > > > >    2. create topic X with replication fact 1 and 4 partitions
> > > > > >    3. verify that each broker has two partitions
> > > > > >    4. shutdown broker 1
> > > > > >    5. start a producer sending data to topic X using
> KafkaProducer
> > > with
> > > > > >    required ack = 1
> > > > > >    6. producer hangs and does not exit.
> > > > > >
> > > > > > Offline partitions were take care of when the partitions is null
> > > (code
> > > > > > attached below). However, the timeout setting does not seem to
> > work.
> > > > Not
> > > > > > sure what caused KafkaProducer to hang.
> > > > > >
> > > > > > // choose the next available node in a round-robin fashion
> > > > > > for (int i = 0; i < numPartitions; i++) {
> > > > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > > > numPartitions;
> > > > > >     if (partitions.get(partition).leader() != null)
> > > > > >         return partition;
> > > > > > }
> > > > > > // no partitions are available, give a non-available partition
> > > > > > return Utils.abs(counter.getAndIncrement()) % numPartitions;
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <
> xwang@rocketfuel.com
> > >
> > > > > wrote:
> > > > > >
> > > > > >> Hello,
> > > > > >>
> > > > > >> I am experimenting sending data to kafka using KafkaProducer and
> > > found
> > > > > >> that when a partition is completely offline, e.g. a topic with
> > > > > replication
> > > > > >> factor = 1 and some broker is down, KafkaProducer seems to be
> > > hanging
> > > > > >> forever. Not even exit with the timeout setting. Can you take a
> > > look?
> > > > > >>
> > > > > >> I checked code and found that the partitioner create partition
> > based
> > > > on
> > > > > >> the total partition number - including those offline partitions.
> > Is
> > > it
> > > > > >> possible that we change ProducerClient to ignore offline
> > partitions?
> > > > > >>
> > > > > >>
> > > > > >> Thanks,
> > > > > >>
> > > > > >> -Xiaoyu
> > > > > >>
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: KafkaProducer from kafka.clients hangs when some partitions are not available

Posted by Xiaoyu Wang <xw...@rocketfuel.com>.
Hi Jun,

If I understand it correctly. the highlighted line is for avoiding
offline partitions, is it?

for (int i = 0; i < numPartitions; i++) {
    int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
    if (partitions.get(partition).leader() != null) {
        return partition; --> should be changed to return the actual
partition number?
    }
}


On Tue, Feb 24, 2015 at 11:30 AM, Jun Rao <ju...@confluent.io> wrote:

> Hi, Xiaoyu,
>
> 1. Could you explain a bit more what the bug is? The code does try to avoid
> picking an unavailable partition. There does seem to be an issue when there
> are more than one thread producing data to the same producer instance. This
> is being tracked in KAFKA-1984. How many producing threads do you have in
> your test?
>
> Thanks,
>
> Jun
>
> On Tue, Feb 24, 2015 at 7:56 AM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:
>
> > Jun,
> >
> > I am trying to test how KafkaProducer behaves with topic replication
> factor
> > = 1
> >
> >    1. One broker is offline BEFORE KafkaProducer starts sending messages.
> >    Because of  the bug I mentioned, KafkaProducer sends to the offline
> >    partition and hangs forever.
> >    2. One broker goes offline WHILE KafkaProducer is sending messages.
> >    KafkaProducer seems to be hanging forever in this case. I am still
> > looking.
> >    Do you mind take a look?
> >
> > Thanks
> >
> >
> >
> >
> > On Mon, Feb 23, 2015 at 7:01 PM, Jun Rao <ju...@confluent.io> wrote:
> >
> > > The logic in that code is to cycle through all partitions and return as
> > > soon as we see a partition with the leader. I do see an issue that if
> > there
> > > are multiple threads sending messages to the same producer
> concurrently,
> > we
> > > may not cycle through all partitions and therefore we could return an
> > > unavailable partition even when available partitions are present.
> > >
> > > Do you see this issue with just a single thread producing messages? The
> > > current logic seems to work correctly in that case.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Feb 20, 2015 at 12:45 PM, Xiaoyu Wang <xw...@rocketfuel.com>
> > > wrote:
> > >
> > > > Found the problem - it is a bug with Partitions of kafka client. Can
> > you
> > > > guys confirm and patch in kafka clients?
> > > >
> > > > for (int i = 0; i < numPartitions; i++) {
> > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > numPartitions;
> > > >     if (partitions.get(partition).leader() != null) {
> > > >         return partitions.get(partition).partition();
> > > >     }
> > > > }
> > > >
> > > >
> > > >
> > > > On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <xw...@rocketfuel.com>
> > > wrote:
> > > >
> > > > > Update:
> > > > >
> > > > > I am using kafka.clients 0.8.2-beta.  Below are the test steps
> > > > >
> > > > >    1. setup local kafka clusters with 2 brokers, 0 and 1
> > > > >    2. create topic X with replication fact 1 and 4 partitions
> > > > >    3. verify that each broker has two partitions
> > > > >    4. shutdown broker 1
> > > > >    5. start a producer sending data to topic X using KafkaProducer
> > with
> > > > >    required ack = 1
> > > > >    6. producer hangs and does not exit.
> > > > >
> > > > > Offline partitions were take care of when the partitions is null
> > (code
> > > > > attached below). However, the timeout setting does not seem to
> work.
> > > Not
> > > > > sure what caused KafkaProducer to hang.
> > > > >
> > > > > // choose the next available node in a round-robin fashion
> > > > > for (int i = 0; i < numPartitions; i++) {
> > > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > > numPartitions;
> > > > >     if (partitions.get(partition).leader() != null)
> > > > >         return partition;
> > > > > }
> > > > > // no partitions are available, give a non-available partition
> > > > > return Utils.abs(counter.getAndIncrement()) % numPartitions;
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <xwang@rocketfuel.com
> >
> > > > wrote:
> > > > >
> > > > >> Hello,
> > > > >>
> > > > >> I am experimenting sending data to kafka using KafkaProducer and
> > found
> > > > >> that when a partition is completely offline, e.g. a topic with
> > > > replication
> > > > >> factor = 1 and some broker is down, KafkaProducer seems to be
> > hanging
> > > > >> forever. Not even exit with the timeout setting. Can you take a
> > look?
> > > > >>
> > > > >> I checked code and found that the partitioner create partition
> based
> > > on
> > > > >> the total partition number - including those offline partitions.
> Is
> > it
> > > > >> possible that we change ProducerClient to ignore offline
> partitions?
> > > > >>
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> -Xiaoyu
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: KafkaProducer from kafka.clients hangs when some partitions are not available

Posted by Jun Rao <ju...@confluent.io>.
Hi, Xiaoyu,

1. Could you explain a bit more what the bug is? The code does try to avoid
picking an unavailable partition. There does seem to be an issue when there
are more than one thread producing data to the same producer instance. This
is being tracked in KAFKA-1984. How many producing threads do you have in
your test?

Thanks,

Jun

On Tue, Feb 24, 2015 at 7:56 AM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:

> Jun,
>
> I am trying to test how KafkaProducer behaves with topic replication factor
> = 1
>
>    1. One broker is offline BEFORE KafkaProducer starts sending messages.
>    Because of  the bug I mentioned, KafkaProducer sends to the offline
>    partition and hangs forever.
>    2. One broker goes offline WHILE KafkaProducer is sending messages.
>    KafkaProducer seems to be hanging forever in this case. I am still
> looking.
>    Do you mind take a look?
>
> Thanks
>
>
>
>
> On Mon, Feb 23, 2015 at 7:01 PM, Jun Rao <ju...@confluent.io> wrote:
>
> > The logic in that code is to cycle through all partitions and return as
> > soon as we see a partition with the leader. I do see an issue that if
> there
> > are multiple threads sending messages to the same producer concurrently,
> we
> > may not cycle through all partitions and therefore we could return an
> > unavailable partition even when available partitions are present.
> >
> > Do you see this issue with just a single thread producing messages? The
> > current logic seems to work correctly in that case.
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Feb 20, 2015 at 12:45 PM, Xiaoyu Wang <xw...@rocketfuel.com>
> > wrote:
> >
> > > Found the problem - it is a bug with Partitions of kafka client. Can
> you
> > > guys confirm and patch in kafka clients?
> > >
> > > for (int i = 0; i < numPartitions; i++) {
> > >     int partition = Utils.abs(counter.getAndIncrement()) %
> numPartitions;
> > >     if (partitions.get(partition).leader() != null) {
> > >         return partitions.get(partition).partition();
> > >     }
> > > }
> > >
> > >
> > >
> > > On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <xw...@rocketfuel.com>
> > wrote:
> > >
> > > > Update:
> > > >
> > > > I am using kafka.clients 0.8.2-beta.  Below are the test steps
> > > >
> > > >    1. setup local kafka clusters with 2 brokers, 0 and 1
> > > >    2. create topic X with replication fact 1 and 4 partitions
> > > >    3. verify that each broker has two partitions
> > > >    4. shutdown broker 1
> > > >    5. start a producer sending data to topic X using KafkaProducer
> with
> > > >    required ack = 1
> > > >    6. producer hangs and does not exit.
> > > >
> > > > Offline partitions were take care of when the partitions is null
> (code
> > > > attached below). However, the timeout setting does not seem to work.
> > Not
> > > > sure what caused KafkaProducer to hang.
> > > >
> > > > // choose the next available node in a round-robin fashion
> > > > for (int i = 0; i < numPartitions; i++) {
> > > >     int partition = Utils.abs(counter.getAndIncrement()) %
> > numPartitions;
> > > >     if (partitions.get(partition).leader() != null)
> > > >         return partition;
> > > > }
> > > > // no partitions are available, give a non-available partition
> > > > return Utils.abs(counter.getAndIncrement()) % numPartitions;
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <xw...@rocketfuel.com>
> > > wrote:
> > > >
> > > >> Hello,
> > > >>
> > > >> I am experimenting sending data to kafka using KafkaProducer and
> found
> > > >> that when a partition is completely offline, e.g. a topic with
> > > replication
> > > >> factor = 1 and some broker is down, KafkaProducer seems to be
> hanging
> > > >> forever. Not even exit with the timeout setting. Can you take a
> look?
> > > >>
> > > >> I checked code and found that the partitioner create partition based
> > on
> > > >> the total partition number - including those offline partitions. Is
> it
> > > >> possible that we change ProducerClient to ignore offline partitions?
> > > >>
> > > >>
> > > >> Thanks,
> > > >>
> > > >> -Xiaoyu
> > > >>
> > > >>
> > > >
> > >
> >
>

Re: KafkaProducer from kafka.clients hangs when some partitions are not available

Posted by Xiaoyu Wang <xw...@rocketfuel.com>.
Jun,

I am trying to test how KafkaProducer behaves with topic replication factor
= 1

   1. One broker is offline BEFORE KafkaProducer starts sending messages.
   Because of  the bug I mentioned, KafkaProducer sends to the offline
   partition and hangs forever.
   2. One broker goes offline WHILE KafkaProducer is sending messages.
   KafkaProducer seems to be hanging forever in this case. I am still looking.
   Do you mind take a look?

Thanks




On Mon, Feb 23, 2015 at 7:01 PM, Jun Rao <ju...@confluent.io> wrote:

> The logic in that code is to cycle through all partitions and return as
> soon as we see a partition with the leader. I do see an issue that if there
> are multiple threads sending messages to the same producer concurrently, we
> may not cycle through all partitions and therefore we could return an
> unavailable partition even when available partitions are present.
>
> Do you see this issue with just a single thread producing messages? The
> current logic seems to work correctly in that case.
>
> Thanks,
>
> Jun
>
> On Fri, Feb 20, 2015 at 12:45 PM, Xiaoyu Wang <xw...@rocketfuel.com>
> wrote:
>
> > Found the problem - it is a bug with Partitions of kafka client. Can you
> > guys confirm and patch in kafka clients?
> >
> > for (int i = 0; i < numPartitions; i++) {
> >     int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
> >     if (partitions.get(partition).leader() != null) {
> >         return partitions.get(partition).partition();
> >     }
> > }
> >
> >
> >
> > On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <xw...@rocketfuel.com>
> wrote:
> >
> > > Update:
> > >
> > > I am using kafka.clients 0.8.2-beta.  Below are the test steps
> > >
> > >    1. setup local kafka clusters with 2 brokers, 0 and 1
> > >    2. create topic X with replication fact 1 and 4 partitions
> > >    3. verify that each broker has two partitions
> > >    4. shutdown broker 1
> > >    5. start a producer sending data to topic X using KafkaProducer with
> > >    required ack = 1
> > >    6. producer hangs and does not exit.
> > >
> > > Offline partitions were take care of when the partitions is null (code
> > > attached below). However, the timeout setting does not seem to work.
> Not
> > > sure what caused KafkaProducer to hang.
> > >
> > > // choose the next available node in a round-robin fashion
> > > for (int i = 0; i < numPartitions; i++) {
> > >     int partition = Utils.abs(counter.getAndIncrement()) %
> numPartitions;
> > >     if (partitions.get(partition).leader() != null)
> > >         return partition;
> > > }
> > > // no partitions are available, give a non-available partition
> > > return Utils.abs(counter.getAndIncrement()) % numPartitions;
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <xw...@rocketfuel.com>
> > wrote:
> > >
> > >> Hello,
> > >>
> > >> I am experimenting sending data to kafka using KafkaProducer and found
> > >> that when a partition is completely offline, e.g. a topic with
> > replication
> > >> factor = 1 and some broker is down, KafkaProducer seems to be hanging
> > >> forever. Not even exit with the timeout setting. Can you take a look?
> > >>
> > >> I checked code and found that the partitioner create partition based
> on
> > >> the total partition number - including those offline partitions. Is it
> > >> possible that we change ProducerClient to ignore offline partitions?
> > >>
> > >>
> > >> Thanks,
> > >>
> > >> -Xiaoyu
> > >>
> > >>
> > >
> >
>

Re: KafkaProducer from kafka.clients hangs when some partitions are not available

Posted by Jun Rao <ju...@confluent.io>.
The logic in that code is to cycle through all partitions and return as
soon as we see a partition with the leader. I do see an issue that if there
are multiple threads sending messages to the same producer concurrently, we
may not cycle through all partitions and therefore we could return an
unavailable partition even when available partitions are present.

Do you see this issue with just a single thread producing messages? The
current logic seems to work correctly in that case.

Thanks,

Jun

On Fri, Feb 20, 2015 at 12:45 PM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:

> Found the problem - it is a bug with Partitions of kafka client. Can you
> guys confirm and patch in kafka clients?
>
> for (int i = 0; i < numPartitions; i++) {
>     int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
>     if (partitions.get(partition).leader() != null) {
>         return partitions.get(partition).partition();
>     }
> }
>
>
>
> On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:
>
> > Update:
> >
> > I am using kafka.clients 0.8.2-beta.  Below are the test steps
> >
> >    1. setup local kafka clusters with 2 brokers, 0 and 1
> >    2. create topic X with replication fact 1 and 4 partitions
> >    3. verify that each broker has two partitions
> >    4. shutdown broker 1
> >    5. start a producer sending data to topic X using KafkaProducer with
> >    required ack = 1
> >    6. producer hangs and does not exit.
> >
> > Offline partitions were take care of when the partitions is null (code
> > attached below). However, the timeout setting does not seem to work. Not
> > sure what caused KafkaProducer to hang.
> >
> > // choose the next available node in a round-robin fashion
> > for (int i = 0; i < numPartitions; i++) {
> >     int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
> >     if (partitions.get(partition).leader() != null)
> >         return partition;
> > }
> > // no partitions are available, give a non-available partition
> > return Utils.abs(counter.getAndIncrement()) % numPartitions;
> >
> >
> >
> >
> >
> > On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <xw...@rocketfuel.com>
> wrote:
> >
> >> Hello,
> >>
> >> I am experimenting sending data to kafka using KafkaProducer and found
> >> that when a partition is completely offline, e.g. a topic with
> replication
> >> factor = 1 and some broker is down, KafkaProducer seems to be hanging
> >> forever. Not even exit with the timeout setting. Can you take a look?
> >>
> >> I checked code and found that the partitioner create partition based on
> >> the total partition number - including those offline partitions. Is it
> >> possible that we change ProducerClient to ignore offline partitions?
> >>
> >>
> >> Thanks,
> >>
> >> -Xiaoyu
> >>
> >>
> >
>

Re: KafkaProducer from kafka.clients hangs when some partitions are not available

Posted by Xiaoyu Wang <xw...@rocketfuel.com>.
Found the problem - it is a bug with Partitions of kafka client. Can you
guys confirm and patch in kafka clients?

for (int i = 0; i < numPartitions; i++) {
    int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
    if (partitions.get(partition).leader() != null) {
        return partitions.get(partition).partition();
    }
}



On Fri, Feb 20, 2015 at 2:35 PM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:

> Update:
>
> I am using kafka.clients 0.8.2-beta.  Below are the test steps
>
>    1. setup local kafka clusters with 2 brokers, 0 and 1
>    2. create topic X with replication fact 1 and 4 partitions
>    3. verify that each broker has two partitions
>    4. shutdown broker 1
>    5. start a producer sending data to topic X using KafkaProducer with
>    required ack = 1
>    6. producer hangs and does not exit.
>
> Offline partitions were take care of when the partitions is null (code
> attached below). However, the timeout setting does not seem to work. Not
> sure what caused KafkaProducer to hang.
>
> // choose the next available node in a round-robin fashion
> for (int i = 0; i < numPartitions; i++) {
>     int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
>     if (partitions.get(partition).leader() != null)
>         return partition;
> }
> // no partitions are available, give a non-available partition
> return Utils.abs(counter.getAndIncrement()) % numPartitions;
>
>
>
>
>
> On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:
>
>> Hello,
>>
>> I am experimenting sending data to kafka using KafkaProducer and found
>> that when a partition is completely offline, e.g. a topic with replication
>> factor = 1 and some broker is down, KafkaProducer seems to be hanging
>> forever. Not even exit with the timeout setting. Can you take a look?
>>
>> I checked code and found that the partitioner create partition based on
>> the total partition number - including those offline partitions. Is it
>> possible that we change ProducerClient to ignore offline partitions?
>>
>>
>> Thanks,
>>
>> -Xiaoyu
>>
>>
>

Re: KafkaProducer from kafka.clients hangs when some partitions are not available

Posted by Xiaoyu Wang <xw...@rocketfuel.com>.
Update:

I am using kafka.clients 0.8.2-beta.  Below are the test steps

   1. setup local kafka clusters with 2 brokers, 0 and 1
   2. create topic X with replication fact 1 and 4 partitions
   3. verify that each broker has two partitions
   4. shutdown broker 1
   5. start a producer sending data to topic X using KafkaProducer with
   required ack = 1
   6. producer hangs and does not exit.

Offline partitions were take care of when the partitions is null (code
attached below). However, the timeout setting does not seem to work. Not
sure what caused KafkaProducer to hang.

// choose the next available node in a round-robin fashion
for (int i = 0; i < numPartitions; i++) {
    int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
    if (partitions.get(partition).leader() != null)
        return partition;
}
// no partitions are available, give a non-available partition
return Utils.abs(counter.getAndIncrement()) % numPartitions;





On Fri, Feb 20, 2015 at 1:48 PM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:

> Hello,
>
> I am experimenting sending data to kafka using KafkaProducer and found
> that when a partition is completely offline, e.g. a topic with replication
> factor = 1 and some broker is down, KafkaProducer seems to be hanging
> forever. Not even exit with the timeout setting. Can you take a look?
>
> I checked code and found that the partitioner create partition based on
> the total partition number - including those offline partitions. Is it
> possible that we change ProducerClient to ignore offline partitions?
>
>
> Thanks,
>
> -Xiaoyu
>
>