You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Oleg Zhurakousky <oz...@hortonworks.com> on 2016/03/02 20:24:36 UTC

Consumer deadlock

Guys

We have a consumer deadlock and here is the relevant dump:

"Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39775787
	at sun.misc.Unsafe.park(Native Method)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
	at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
	at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
	at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
	at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
        . . . . .

What worries me is the fact that ‘hasNext’ is essentially a blocking operation. I can’t seem to find a single reason when it would be useful, hence I am calling it a bug, but hopefully someone can clarify.
Kafka version is 0.8.*

Cheers
Oleg


Re: Consumer deadlock

Posted by Jason Gustafson <ja...@confluent.io>.
Can you post some logs from the consumer? That should tell us what it's
busy doing while hanging. You may have to enable DEBUG level.

-Jason

On Thu, Mar 3, 2016 at 5:02 PM, Muthukumaran K <mu...@ericsson.com>
wrote:

> Hi Jason,
>
> I am using 0.9 broker.
>
> One more observation. I had written producer code with 0.9 - Even with
> Producer code, I had hanging issue where send method was hanging requesting
> metadata. Thread-dump below
>
> "main" prio=6 tid=0x0000000002238000 nid=0x1390 in Object.wait()
> [0x00000000025bf000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>             at java.lang.Object.wait(Native Method)
>             - waiting on <0x00000007aecacea0> (a
> org.apache.kafka.clients.Metadata)
>             at
> org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:121)
>             - locked <0x00000007aecacea0> (a
> org.apache.kafka.clients.Metadata)
>             at
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:483)
>             at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:412)
>             at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)
>             at kafkaexperiments.ProducerDriver.main(ProducerDriver.java:41)
>
> Then I included metadata.fetch.timeout.ms=1 and then producer started
> working. But when I poll the same topic using kafka-console-consumer.sh,
> console-consumer also hangs.
>
>
>
> Regards
> Muthu
>
>
> -----Original Message-----
> From: Jason Gustafson [mailto:jason@confluent.io]
> Sent: Friday, March 04, 2016 5:33 AM
> To: users@kafka.apache.org
> Subject: Re: Consumer deadlock
>
> Hi there,
>
> Just to clarify, is the broker still on 0.8? Unfortunately, the new
> consumer needs 0.9. That probably would explain the hanging.
>
> -Jason
>
> On Thu, Mar 3, 2016 at 2:28 AM, Muthukumaran K <
> muthukumaran.k@ericsson.com>
> wrote:
>
> > Ewen,
> >
> > By new Consumer API, you mean KafkaConsumer ? I have an issue with a
> > poll in 0.9.0.1. poll hangs indefinitely even with the timeout
> >
> > Following is the consumer code which I am using. Any pointers would be
> > helpful
> >
> > public class ConsumerLoop implements Runnable {
> >
> >
> >     private final KafkaConsumer<String, String> consumer;
> >     private final List<String> topics;
> >     private final int id;
> >
> >     public ConsumerLoop(int id,
> >                       String groupId,
> >                       List<String> topics) {
> >         this.id = id;
> >         this.topics = topics;
> >         Properties props = new Properties();
> >         props.put("bootstrap.servers", "192.168.56.101:9092");
> >         props.put("group.id", groupId);
> >         props.put("auto.offset.reset", "earliest");
> >         props.put("key.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >         props.put("value.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >         props.put("metadata.fetch.timeout.ms", 1);
> >
> >         this.consumer = new KafkaConsumer<>(props);
> >
> >     }
> >
> >     @Override
> >     public void run() {
> >         try {
> >
> >             System.out.println("Starting consumer ID : " + id +
> >                     " Thread : " + Thread.currentThread().getName() +
> >                     " Topic : " + topics.toString() +
> >                     " ... ");
> >             long startTime = System.currentTimeMillis();
> >             int recordCount = 0;
> >
> >           consumer.subscribe(topics);
> >
> >           System.out.println("Consumer-ID " + id + " after
> > subscribe...");
> >
> >           while (true) {
> >             ConsumerRecords<String, String> records =
> > consumer.poll(10000);
> >
> >             System.out.println("Consumer-ID " + id + " after
> > poll...");
> >
> >
> >             for (ConsumerRecord<String, String> record : records) {
> >               Map<String, Object> data = new HashMap<>();
> >               data.put("partition", record.partition());
> >               data.put("offset", record.offset());
> >               data.put("value", record.value());
> >               System.out.println(
> >                       "Consumer-ID : " + this.id +
> >                               ": " + data +
> >                               " Thread_name : " +
> > Thread.currentThread().getName());
> >               recordCount++;
> >
> >             }
> >             long endTime = System.currentTimeMillis();
> >             long duration = (endTime - startTime)/1000;
> >             System.out.println("###### rate : " + recordCount/duration +
> "
> > msgs/sec on Consumer ID " + id);
> >
> >           }
> >         } catch (WakeupException e) {
> >           // ignore for shutdown
> >         } finally {
> >           consumer.close();
> >         }
> >     }
> >
> >     public void shutdown() {
> >
> >         consumer.wakeup();
> >     }
> >
> > Regards
> > Muthu
> >
> >
> > -----Original Message-----
> > From: Ewen Cheslack-Postava [mailto:ewen@confluent.io]
> > Sent: Thursday, March 03, 2016 2:21 PM
> > To: users@kafka.apache.org
> > Subject: Re: Consumer deadlock
> >
> > Take a look at the consumer.timeout.ms setting if you don't want the
> > iterator to block indefinitely.
> >
> > And a better long term solution is to switch to the new consumer, but
> > that obviously requires much more significant code changes. The new
> > consumer API is a single-threaded poll-based API where you can always
> > specify timeouts to the consumer's poll() method (though it currently
> > has some limitations to how it enforces that timeout).
> >
> > -Ewen
> >
> > On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky <
> > ozhurakousky@hortonworks.com> wrote:
> >
> > > Guys
> > >
> > > We have a consumer deadlock and here is the relevant dump:
> > >
> > > "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@
> > 39775787
> > >         at sun.misc.Unsafe.park(Native Method)
> > >         at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> > >         at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.
> > awaitNanos(AbstractQueuedSynchronizer.java:2078)
> > >         at
> > >
> > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java
> > :467)
> > >         at
> > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
> > >         at
> > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> > >         at
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> > >         at
> > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> > >         . . . . .
> > >
> > > What worries me is the fact that ‘hasNext’ is essentially a blocking
> > > operation. I can’t seem to find a single reason when it would be
> > > useful, hence I am calling it a bug, but hopefully someone can clarify.
> > > Kafka version is 0.8.*
> > >
> > > Cheers
> > > Oleg
> > >
> > >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>

RE: Consumer deadlock

Posted by Muthukumaran K <mu...@ericsson.com>.
Hi Jason, 

I am using 0.9 broker. 

One more observation. I had written producer code with 0.9 - Even with Producer code, I had hanging issue where send method was hanging requesting metadata. Thread-dump below 

"main" prio=6 tid=0x0000000002238000 nid=0x1390 in Object.wait() [0x00000000025bf000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
            at java.lang.Object.wait(Native Method)
            - waiting on <0x00000007aecacea0> (a org.apache.kafka.clients.Metadata)
            at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:121)
            - locked <0x00000007aecacea0> (a org.apache.kafka.clients.Metadata)
            at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:483)
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:412)
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)
            at kafkaexperiments.ProducerDriver.main(ProducerDriver.java:41)

Then I included metadata.fetch.timeout.ms=1 and then producer started working. But when I poll the same topic using kafka-console-consumer.sh, console-consumer also hangs. 



Regards
Muthu


-----Original Message-----
From: Jason Gustafson [mailto:jason@confluent.io] 
Sent: Friday, March 04, 2016 5:33 AM
To: users@kafka.apache.org
Subject: Re: Consumer deadlock

Hi there,

Just to clarify, is the broker still on 0.8? Unfortunately, the new consumer needs 0.9. That probably would explain the hanging.

-Jason

On Thu, Mar 3, 2016 at 2:28 AM, Muthukumaran K <mu...@ericsson.com>
wrote:

> Ewen,
>
> By new Consumer API, you mean KafkaConsumer ? I have an issue with a 
> poll in 0.9.0.1. poll hangs indefinitely even with the timeout
>
> Following is the consumer code which I am using. Any pointers would be 
> helpful
>
> public class ConsumerLoop implements Runnable {
>
>
>     private final KafkaConsumer<String, String> consumer;
>     private final List<String> topics;
>     private final int id;
>
>     public ConsumerLoop(int id,
>                       String groupId,
>                       List<String> topics) {
>         this.id = id;
>         this.topics = topics;
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "192.168.56.101:9092");
>         props.put("group.id", groupId);
>         props.put("auto.offset.reset", "earliest");
>         props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("metadata.fetch.timeout.ms", 1);
>
>         this.consumer = new KafkaConsumer<>(props);
>
>     }
>
>     @Override
>     public void run() {
>         try {
>
>             System.out.println("Starting consumer ID : " + id +
>                     " Thread : " + Thread.currentThread().getName() +
>                     " Topic : " + topics.toString() +
>                     " ... ");
>             long startTime = System.currentTimeMillis();
>             int recordCount = 0;
>
>           consumer.subscribe(topics);
>
>           System.out.println("Consumer-ID " + id + " after 
> subscribe...");
>
>           while (true) {
>             ConsumerRecords<String, String> records = 
> consumer.poll(10000);
>
>             System.out.println("Consumer-ID " + id + " after 
> poll...");
>
>
>             for (ConsumerRecord<String, String> record : records) {
>               Map<String, Object> data = new HashMap<>();
>               data.put("partition", record.partition());
>               data.put("offset", record.offset());
>               data.put("value", record.value());
>               System.out.println(
>                       "Consumer-ID : " + this.id +
>                               ": " + data +
>                               " Thread_name : " + 
> Thread.currentThread().getName());
>               recordCount++;
>
>             }
>             long endTime = System.currentTimeMillis();
>             long duration = (endTime - startTime)/1000;
>             System.out.println("###### rate : " + recordCount/duration + "
> msgs/sec on Consumer ID " + id);
>
>           }
>         } catch (WakeupException e) {
>           // ignore for shutdown
>         } finally {
>           consumer.close();
>         }
>     }
>
>     public void shutdown() {
>
>         consumer.wakeup();
>     }
>
> Regards
> Muthu
>
>
> -----Original Message-----
> From: Ewen Cheslack-Postava [mailto:ewen@confluent.io]
> Sent: Thursday, March 03, 2016 2:21 PM
> To: users@kafka.apache.org
> Subject: Re: Consumer deadlock
>
> Take a look at the consumer.timeout.ms setting if you don't want the 
> iterator to block indefinitely.
>
> And a better long term solution is to switch to the new consumer, but 
> that obviously requires much more significant code changes. The new 
> consumer API is a single-threaded poll-based API where you can always 
> specify timeouts to the consumer's poll() method (though it currently 
> has some limitations to how it enforces that timeout).
>
> -Ewen
>
> On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky < 
> ozhurakousky@hortonworks.com> wrote:
>
> > Guys
> >
> > We have a consumer deadlock and here is the relevant dump:
> >
> > "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@
> 39775787
> >         at sun.misc.Unsafe.park(Native Method)
> >         at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> >         at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.
> awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >         at
> >
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java
> :467)
> >         at
> > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
> >         at
> > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> >         at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> >         at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> >         . . . . .
> >
> > What worries me is the fact that ‘hasNext’ is essentially a blocking 
> > operation. I can’t seem to find a single reason when it would be 
> > useful, hence I am calling it a bug, but hopefully someone can clarify.
> > Kafka version is 0.8.*
> >
> > Cheers
> > Oleg
> >
> >
>
>
> --
> Thanks,
> Ewen
>

Re: Consumer deadlock

Posted by Jason Gustafson <ja...@confluent.io>.
Hi there,

Just to clarify, is the broker still on 0.8? Unfortunately, the new
consumer needs 0.9. That probably would explain the hanging.

-Jason

On Thu, Mar 3, 2016 at 2:28 AM, Muthukumaran K <mu...@ericsson.com>
wrote:

> Ewen,
>
> By new Consumer API, you mean KafkaConsumer ? I have an issue with a poll
> in 0.9.0.1. poll hangs indefinitely even with the timeout
>
> Following is the consumer code which I am using. Any pointers would be
> helpful
>
> public class ConsumerLoop implements Runnable {
>
>
>     private final KafkaConsumer<String, String> consumer;
>     private final List<String> topics;
>     private final int id;
>
>     public ConsumerLoop(int id,
>                       String groupId,
>                       List<String> topics) {
>         this.id = id;
>         this.topics = topics;
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "192.168.56.101:9092");
>         props.put("group.id", groupId);
>         props.put("auto.offset.reset", "earliest");
>         props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("metadata.fetch.timeout.ms", 1);
>
>         this.consumer = new KafkaConsumer<>(props);
>
>     }
>
>     @Override
>     public void run() {
>         try {
>
>             System.out.println("Starting consumer ID : " + id +
>                     " Thread : " + Thread.currentThread().getName() +
>                     " Topic : " + topics.toString() +
>                     " ... ");
>             long startTime = System.currentTimeMillis();
>             int recordCount = 0;
>
>           consumer.subscribe(topics);
>
>           System.out.println("Consumer-ID " + id + " after subscribe...");
>
>           while (true) {
>             ConsumerRecords<String, String> records = consumer.poll(10000);
>
>             System.out.println("Consumer-ID " + id + " after poll...");
>
>
>             for (ConsumerRecord<String, String> record : records) {
>               Map<String, Object> data = new HashMap<>();
>               data.put("partition", record.partition());
>               data.put("offset", record.offset());
>               data.put("value", record.value());
>               System.out.println(
>                       "Consumer-ID : " + this.id +
>                               ": " + data +
>                               " Thread_name : " +
> Thread.currentThread().getName());
>               recordCount++;
>
>             }
>             long endTime = System.currentTimeMillis();
>             long duration = (endTime - startTime)/1000;
>             System.out.println("###### rate : " + recordCount/duration + "
> msgs/sec on Consumer ID " + id);
>
>           }
>         } catch (WakeupException e) {
>           // ignore for shutdown
>         } finally {
>           consumer.close();
>         }
>     }
>
>     public void shutdown() {
>
>         consumer.wakeup();
>     }
>
> Regards
> Muthu
>
>
> -----Original Message-----
> From: Ewen Cheslack-Postava [mailto:ewen@confluent.io]
> Sent: Thursday, March 03, 2016 2:21 PM
> To: users@kafka.apache.org
> Subject: Re: Consumer deadlock
>
> Take a look at the consumer.timeout.ms setting if you don't want the
> iterator to block indefinitely.
>
> And a better long term solution is to switch to the new consumer, but that
> obviously requires much more significant code changes. The new consumer API
> is a single-threaded poll-based API where you can always specify timeouts
> to the consumer's poll() method (though it currently has some limitations
> to how it enforces that timeout).
>
> -Ewen
>
> On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky <
> ozhurakousky@hortonworks.com> wrote:
>
> > Guys
> >
> > We have a consumer deadlock and here is the relevant dump:
> >
> > "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39775787
> >         at sun.misc.Unsafe.park(Native Method)
> >         at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> >         at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >         at
> >
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
> >         at
> > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
> >         at
> > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> >         at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> >         at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> >         . . . . .
> >
> > What worries me is the fact that ‘hasNext’ is essentially a blocking
> > operation. I can’t seem to find a single reason when it would be
> > useful, hence I am calling it a bug, but hopefully someone can clarify.
> > Kafka version is 0.8.*
> >
> > Cheers
> > Oleg
> >
> >
>
>
> --
> Thanks,
> Ewen
>

RE: Consumer deadlock

Posted by Muthukumaran K <mu...@ericsson.com>.
Ewen, 

By new Consumer API, you mean KafkaConsumer ? I have an issue with a poll in 0.9.0.1. poll hangs indefinitely even with the timeout

Following is the consumer code which I am using. Any pointers would be helpful

public class ConsumerLoop implements Runnable {
    
    
    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop(int id,
                      String groupId,
                      List<String> topics) {
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.56.101:9092");
        props.put("group.id", groupId);
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("metadata.fetch.timeout.ms", 1);

        this.consumer = new KafkaConsumer<>(props);

    }

    @Override
    public void run() {
        try {
            
            System.out.println("Starting consumer ID : " + id + 
                    " Thread : " + Thread.currentThread().getName() + 
                    " Topic : " + topics.toString() +
                    " ... ");
            long startTime = System.currentTimeMillis();
            int recordCount = 0;

          consumer.subscribe(topics);

          System.out.println("Consumer-ID " + id + " after subscribe...");

          while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10000);

            System.out.println("Consumer-ID " + id + " after poll...");


            for (ConsumerRecord<String, String> record : records) {
              Map<String, Object> data = new HashMap<>();
              data.put("partition", record.partition());
              data.put("offset", record.offset());
              data.put("value", record.value());
              System.out.println(
                      "Consumer-ID : " + this.id +
                              ": " + data +
                              " Thread_name : " + Thread.currentThread().getName());
              recordCount++;

            }
            long endTime = System.currentTimeMillis();
            long duration = (endTime - startTime)/1000;
            System.out.println("###### rate : " + recordCount/duration + " msgs/sec on Consumer ID " + id);

          }
        } catch (WakeupException e) {
          // ignore for shutdown
        } finally {
          consumer.close();
        }
    }

    public void shutdown() {

        consumer.wakeup();
    }

Regards
Muthu


-----Original Message-----
From: Ewen Cheslack-Postava [mailto:ewen@confluent.io] 
Sent: Thursday, March 03, 2016 2:21 PM
To: users@kafka.apache.org
Subject: Re: Consumer deadlock

Take a look at the consumer.timeout.ms setting if you don't want the iterator to block indefinitely.

And a better long term solution is to switch to the new consumer, but that obviously requires much more significant code changes. The new consumer API is a single-threaded poll-based API where you can always specify timeouts to the consumer's poll() method (though it currently has some limitations to how it enforces that timeout).

-Ewen

On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky < ozhurakousky@hortonworks.com> wrote:

> Guys
>
> We have a consumer deadlock and here is the relevant dump:
>
> "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39775787
>         at sun.misc.Unsafe.park(Native Method)
>         at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>         at
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
>         at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
>         at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>         at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>         . . . . .
>
> What worries me is the fact that ‘hasNext’ is essentially a blocking 
> operation. I can’t seem to find a single reason when it would be 
> useful, hence I am calling it a bug, but hopefully someone can clarify.
> Kafka version is 0.8.*
>
> Cheers
> Oleg
>
>


--
Thanks,
Ewen

Re: Consumer deadlock

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Yes, mapping an iterator interface to a networked service can have some
drawbacks :) Providing a simple, familiar interface to Kafka is challenging
-- if you didn't block and hasNext() returned false, what would you do
after that? That return value doesn't indicate that the iterable for a
Kafka topic ended, just that there wasn't anything available *immediately*.
But once hasNext() returns false, it's expected that it will always return
false and that there won't be more elements from the Iterator. With a Kafka
topic, there isn't really an end to the collection, there just might not be
any data available immediately.

When it comes down to it, there's a mismatch between the Java Iterator
interface and what Kafka is trying to provide. That's one of the reasons
the consumer interface has been rethought and looks significantly different
in the new consumer:
http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

-Ewen

On Thu, Mar 3, 2016 at 1:51 AM, Oleg Zhurakousky <
ozhurakousky@hortonworks.com> wrote:

> Thank you Ewan, I'll give it a shot, but I am still puzzled as to the
> reasoning behind implementing hasNext() in a manner that it can block.
> Isn't that counter to the whole point of the method? You either have it or
> you not. Blocking in the off chance one may have it in some time in the
> future simply means it's not there at the moment, so why not return false
> and let user retry?
>
> Sent from my iPhone
>
> > On Mar 3, 2016, at 03:51, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
> >
> > Take a look at the consumer.timeout.ms setting if you don't want the
> > iterator to block indefinitely.
> >
> > And a better long term solution is to switch to the new consumer, but
> that
> > obviously requires much more significant code changes. The new consumer
> API
> > is a single-threaded poll-based API where you can always specify timeouts
> > to the consumer's poll() method (though it currently has some limitations
> > to how it enforces that timeout).
> >
> > -Ewen
> >
> > On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky <
> > ozhurakousky@hortonworks.com> wrote:
> >
> >> Guys
> >>
> >> We have a consumer deadlock and here is the relevant dump:
> >>
> >> "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39775787
> >>        at sun.misc.Unsafe.park(Native Method)
> >>        at
> >> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> >>        at
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >>        at
> >>
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
> >>        at
> >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
> >>        at
> >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> >>        at
> >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> >>        at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> >>        . . . . .
> >>
> >> What worries me is the fact that ‘hasNext’ is essentially a blocking
> >> operation. I can’t seem to find a single reason when it would be useful,
> >> hence I am calling it a bug, but hopefully someone can clarify.
> >> Kafka version is 0.8.*
> >>
> >> Cheers
> >> Oleg
> >
> >
> > --
> > Thanks,
> > Ewen
>



-- 
Thanks,
Ewen

Re: Consumer deadlock

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Thank you Ewan, I'll give it a shot, but I am still puzzled as to the reasoning behind implementing hasNext() in a manner that it can block. Isn't that counter to the whole point of the method? You either have it or you not. Blocking in the off chance one may have it in some time in the future simply means it's not there at the moment, so why not return false and let user retry?

Sent from my iPhone

> On Mar 3, 2016, at 03:51, Ewen Cheslack-Postava <ew...@confluent.io> wrote:
> 
> Take a look at the consumer.timeout.ms setting if you don't want the
> iterator to block indefinitely.
> 
> And a better long term solution is to switch to the new consumer, but that
> obviously requires much more significant code changes. The new consumer API
> is a single-threaded poll-based API where you can always specify timeouts
> to the consumer's poll() method (though it currently has some limitations
> to how it enforces that timeout).
> 
> -Ewen
> 
> On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky <
> ozhurakousky@hortonworks.com> wrote:
> 
>> Guys
>> 
>> We have a consumer deadlock and here is the relevant dump:
>> 
>> "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39775787
>>        at sun.misc.Unsafe.park(Native Method)
>>        at
>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>>        at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>>        at
>> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
>>        at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
>>        at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>        at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>>        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>>        . . . . .
>> 
>> What worries me is the fact that ‘hasNext’ is essentially a blocking
>> operation. I can’t seem to find a single reason when it would be useful,
>> hence I am calling it a bug, but hopefully someone can clarify.
>> Kafka version is 0.8.*
>> 
>> Cheers
>> Oleg
> 
> 
> -- 
> Thanks,
> Ewen

Re: Consumer deadlock

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Take a look at the consumer.timeout.ms setting if you don't want the
iterator to block indefinitely.

And a better long term solution is to switch to the new consumer, but that
obviously requires much more significant code changes. The new consumer API
is a single-threaded poll-based API where you can always specify timeouts
to the consumer's poll() method (though it currently has some limitations
to how it enforces that timeout).

-Ewen

On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky <
ozhurakousky@hortonworks.com> wrote:

> Guys
>
> We have a consumer deadlock and here is the relevant dump:
>
> "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39775787
>         at sun.misc.Unsafe.park(Native Method)
>         at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>         at
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
>         at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
>         at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>         at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>         . . . . .
>
> What worries me is the fact that ‘hasNext’ is essentially a blocking
> operation. I can’t seem to find a single reason when it would be useful,
> hence I am calling it a bug, but hopefully someone can clarify.
> Kafka version is 0.8.*
>
> Cheers
> Oleg
>
>


-- 
Thanks,
Ewen