You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jeff Wartes (JIRA)" <ji...@apache.org> on 2014/09/30 23:32:33 UTC

[jira] [Commented] (KAFKA-520) ConsumerIterator implemented by KafkaStream doesn't follow Java practices

    [ https://issues.apache.org/jira/browse/KAFKA-520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14153791#comment-14153791 ] 

Jeff Wartes commented on KAFKA-520:
-----------------------------------

This is still true two years later. 

Something that implements Iterator but cannot return false is clearly a broken implementation. 
Blocking here requires extensive gymnastics to handle correctly in a concurrent architecture like akka.

Is this deprecated? Is there a more preferred method of consumption?

> ConsumerIterator implemented by KafkaStream doesn't follow Java practices
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-520
>                 URL: https://issues.apache.org/jira/browse/KAFKA-520
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.7, 0.7.1
>            Reporter: Esko Suomi
>
> As a foreword, this only applies to Java conventions - if things are different on the Scala side, then so be it and that's fine.
> As mentioned in the summary, ConsumerIterator doesn't follow proper Java practices, to be exact it doesn't follow them in its functionality. The biggest offender is the #hasNext() method which blocks until ConsumerTimeoutException is thrown. While it is obvious that this is because the targeted use-case is infinite consuming of a given topic, it did confuse me as an API integration programmer since the documentation was severely lacking and I only started to observe this problem in our staging environment.
> There are multiple ways that I find appropriate to fix this:
> - Instead of implementing java.util.Iterator, make the class an implementation of BlockingQueue. Since BlockingQueue is in the java.util.concurrent package, it should nudge the user's mind to correct tracks about the class' semantics immediately.
> - Get rid of the concept of internal infinite iteration and instead make the Iterator represent one fetched block of data; that way the infinite loop for consuming can be something like
> while (!Thread.interrupted) {
>     Iterator it = ks.readMore(...);
>     while (iterator.hasNext()) {
>         /* consume messages */
>     }
> }
> In addition to clearer Java API, this also gets rid of the exception being used for flow control which, once again, doesn't fit to Java best practices.
> - Update the documentation (both API and quickstart) to explain how to recover from such failure.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)