You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Neha Narkhede <ne...@gmail.com> on 2014/05/05 20:09:14 UTC

Re: Review Request 19731: Patch for KAFKA-1328

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/19731/#review41974
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
<https://reviews.apache.org/r/19731/#comment75659>

    Did you mean unchecked exception? If so, yes. This is consistent with the producer, I think.



clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
<https://reviews.apache.org/r/19731/#comment75669>

    Right. I started with a simple list of ConsumerRecord but found it very painful while writing the examples. Basically whether or not clients prefer collation by topic or partition depends on the nature of the use case, threadpool processing strategy as well as topic vs partition subscription. Another thing is figuring out how to throw a per partition exception. This led to an API design that allows collation by topic as well as partition. If we need to throw partition level exceptions, ConsumerRecordMetadata would be one way of exposing those



clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
<https://reviews.apache.org/r/19731/#comment75670>

    Yes. If the user is not interested in some partitions, it is best to unsubscribe. commit() will always commit offsets for all subscribed partitions owned by the consumer.



clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
<https://reviews.apache.org/r/19731/#comment75790>

    Yes, I thought of that, but changed it to a separate OffsetMetadata object for a couple of reasons
    
    1. It provides flexibility to allow us to expose more information, if required. For example, return the last committed offset if the commit for a particular partition fails. 
    2. It stays consistent with the producer client APIs where we don't return the error code value to the user but instead throw an exception while accessing the data (in this case the offset. 
    
    However, I see your point about returning complex data back. I think it can be simplified by returning a Future of OffsetMetadata and changing OffsetMetadata to have an offset() API that returns either the last committed offset or throws an exception. That might address your concern as well as get us the flexibility and consistency. 
    
    Thoughts?



clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
<https://reviews.apache.org/r/19731/#comment75885>

    Yes, that is better for consistency 



clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
<https://reviews.apache.org/r/19731/#comment75886>

    This is already a batch API.



clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
<https://reviews.apache.org/r/19731/#comment75896>

    auto.offset.reset=disable will expect the consumer to set the offset before the first poll(). This can be done using commit() and seek().



clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
<https://reviews.apache.org/r/19731/#comment75898>

    The memory management on the consumer is going to require server side changes. For example, if the consumer's fetch request contains a max limit (set using total.memory.bytes) and a fetch.buffer.bytes, the server will return at least fetch.buffer.bytes from a subset of the n partitions. The server selects the partitions in round robin or randomly. We can discuss more details in the design review. 



clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
<https://reviews.apache.org/r/19731/#comment75899>

    Removed



clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
<https://reviews.apache.org/r/19731/#comment75900>

    The returned object from partition() is TopicPartition on purpose. I realized that returning partition id from this API is useless since all other APIs in the consumer accept TopicPartition. The constructor parameter can be renamed to partitionId.



clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
<https://reviews.apache.org/r/19731/#comment75901>

    Should we? It is a little odd that the returned record has an API called nextOffset(), especially since we are moving away from an iterator like API to a collection of records kind API. The downside ofcourse is that we are exposing the assumption that the offset of the next available message is currentOffset+1. However, I would argue that it is the most logical expected behavior from Kafka that we should never change.  



clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
<https://reviews.apache.org/r/19731/#comment75902>

    This is to keep it consistent with the producer side. Also, I think from a user perspective an exception is more intuitive and an integer error code, no?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/19731/#comment75903>

    Yes, it is probably confusing since we haven't discussed the design and implementation in detail. So we can hold off on describing this until that is complete. The question is can you pass the KafkaConsumer object into multiple threads and have them call poll()? 



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/19731/#comment75907>

    I intend to do that once people have finished reviewing the APIs.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/19731/#comment75909>

    I included this example on purpose since this is one of the more complex combination of group management and offset management. So we should not replace this with the example you mentioned. I'd rather add the example you mentioned. However, will prefer to do that after the API reviews are complete.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/19731/#comment75912>

    You can use this to determine how to use seek(). We can add an example, but I'm not so sure which example would make the most sense at this point. 


- Neha Narkhede


On April 13, 2014, 2:12 a.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/19731/
> -----------------------------------------------------------
> 
> (Updated April 13, 2014, 2:12 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1328
>     https://issues.apache.org/jira/browse/KAFKA-1328
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Fixed the javadoc usage examples in KafkaConsumer to match the API changes
> 
> 
> Changed the signature of poll to return Map<String,ConsumerRecordMetadata> to organize the ConsumerRecords around topic and then optionally around partition. This will serve the group management as well as custom partition subscription use cases
> 
> 
> 1. Changed the signature of poll() to return Map<String, List<ConsumerRecord>> 2. Changed ConsumerRecord to throw an exception if an error is detected for the partition. For example, if a single large message is larger than the total memory just for that partition, we don't want poll() to throw an exception since that will affect the processing of the remaining partitions as well
> 
> 
> Fixed MockConsumer to make subscribe(topics) and subscribe(partitions) mutually exclusive
> 
> 
> Changed the package to org.apache.kafka.clients.consumer from kafka.clients.consumer
> 
> 
> Changed the package to org.apache.kafka.clients.consumer from kafka.clients.consumer
> 
> 
> 1. Removed the commitAsync() APIs 2. Changed the commit() APIs to return a Future
> 
> 
> Fixed configs to match the producer side configs for metrics
> 
> 
> Renamed AUTO_COMMIT_ENABLE_CONFIG to ENABLE_AUTO_COMMIT_CONFIG
> 
> 
> Addressing review comments from Tim and Guozhang
> 
> 
> Rebasing after producer side config cleanup
> 
> 
> Added license headers
> 
> 
> Cleaned javadoc for ConsumerConfig
> 
> 
> Fixed minor indentation in ConsumerConfig
> 
> 
> Improve docs on ConsumerConfig
> 
> 
> 1. Added ClientUtils 2. Added basic constructor implementation for KafkaConsumer
> 
> 
> Improved MockConsumer
> 
> 
> Chris's feedback and also consumer rewind example code
> 
> 
> Added commit() and commitAsync() APIs to the consumer and updated docs and examples to reflect that
> 
> 
> 1. Added consumer usage examples to javadoc 2. Changed signature of APIs that accept or return offsets from list of offsets to map of offsets
> 
> 
> Improved example for using ConsumerRebalanceCallback
> 
> 
> Improved example for using ConsumerRebalanceCallback
> 
> 
> Included Jun's review comments and renamed positions to seek. Also included position()
> 
> 
> Changes to javadoc for positions()
> 
> 
> Changed the javadoc for ConsumerRebalanceCallback
> 
> 
> Changing unsubscribe to also take in var args for topic list
> 
> 
> Incorporated first round of feedback from Jay, Pradeep and Mattijs on the mailing list
> 
> 
> Updated configs
> 
> 
> Javadoc for consumer complete
> 
> 
> Completed docs for Consumer and ConsumerRebalanceCallback. Added MockConsumer
> 
> 
> Added the initial interfaces and related documentation for the consumer. More docs required to complete the public API
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/kafka/common/TopicPartitionOffset.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/FutureOffsetMetadata.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java a6423f4b37a57f0290e2048b764de1218470f4f7 
>   clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/19731/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>