You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/10 05:05:27 UTC

[GitHub] [kafka] vvcephei opened a new pull request #10096: KAFKA-12268: Make early poll return opt-in

vvcephei opened a new pull request #10096:
URL: https://github.com/apache/kafka/pull/10096


   * Revert the default Consumer#poll behavior back to early return on records only
   * Add config to enable early return on metadata or records
   * Set the return-on-metadata config in Streams to support KIP-695
   * Revert the undesired addition of Hamcrest to the Client module
   * Revert the now unnecessary poll-until-records logic in PlaintextConsumerTest
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10096: KAFKA-12268: Make early poll return opt-in

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10096:
URL: https://github.com/apache/kafka/pull/10096#discussion_r573459714



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1201,6 +1201,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         // disable auto topic creation
         consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
 
+        // enable early return on metadata
+        consumerProps.put(ConsumerConfig.LONG_POLL_MODE_CONFIG, ConsumerConfig.LONG_POLL_RETURN_ON_RECORDS);

Review comment:
       It should be `LONG_POLL_RETURN_ON_RESPONSE` rather than `LONG_POLL_RETURN_ON_RECORDS`

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1236,7 +1243,17 @@ public void assign(Collection<TopicPartition> partitions) {
                 }
 
                 final FetchedRecords<K, V> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
+
+                // We only need to save off the metadata if we are NOT supposed to return early on metadata-only responses
+                if (!longPollExitEarlyOnMetadata) {

Review comment:
       How about updating `nullableSeenMetadata` only if we don't return records?
   ```java
   if (longPollShouldReturn(records)) { 
    ...
   } else {
     if (nullableSeenMetadata == null) {
       nullableSeenMetadata = new HashMap<>(records.metadata());
     } else {
       nullableSeenMetadata.putAll(records.metadata());
     }
   }
   ```
   
   The benefit from above code is that we don't need to handle duplicate metadata which exists on both `FetchedRecords` and `nullableSeenMetadata` when it succeed to get records and metadata in first loop.
    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10096: KAFKA-12268: Make early poll return opt-in

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10096:
URL: https://github.com/apache/kafka/pull/10096#discussion_r573853644



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1201,6 +1201,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         // disable auto topic creation
         consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
 
+        // enable early return on metadata
+        consumerProps.put(ConsumerConfig.LONG_POLL_MODE_CONFIG, ConsumerConfig.LONG_POLL_RETURN_ON_RECORDS);

Review comment:
       Oy, you are right. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10096: KAFKA-12268: Make early poll return opt-in

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10096:
URL: https://github.com/apache/kafka/pull/10096#discussion_r573924978



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
##########
@@ -71,6 +71,21 @@
     /** <code>max.poll.interval.ms</code> */
     public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
     private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;
+
+    /** <code>long.poll.mode</code> */
+    public static final String LONG_POLL_RETURN_ON_RECORDS = "return_on_records";
+    public static final String LONG_POLL_RETURN_ON_RESPONSE = "return_on_response";
+    public static final String LONG_POLL_MODE_CONFIG = "long.poll.mode";
+    public static final String LONG_POLL_MODE_DOC = "Whether a call to Consumer#poll(...) should block until" +
+        " records are received (return_on_records), or should return early if a metadata-only response is received" +
+        " from the brokers (return_on_metadata). The 'return_on_records' mode is the default, and it matches prior" +
+        " behavior. The 'return_on_response' mode allows callers to witness topic metadata changes via" +
+        " ConsumerRecords#metadata() as soon as they are received in a fetch response, even if there are no records" +
+        " included in the response. Callers who are only interested in seeing records should leave the default in" +
+        " place, and callers who are interested in maintaining fresh local information about the current lag should" +
+        " enable 'return_on_response'." +
+        " Note that the Consumer metrics are always updated upon receipt of the fetch responses.";

Review comment:
       Thanks @ijuma .
   
   I do agree with you. In the vote thread, others made the case that the behavior change is not ok. If you disagree with the conclusion of the vote thread and want to argue that it actually is ok for `Consumer#poll` to return when there are only metadata updates, I'm happy to reconsider the decision.
   
   I didn't update the text of the KIP yet because I suspected that this PR might inspire a new round of discussion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10096: KAFKA-12268: Make early poll return opt-in

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10096:
URL: https://github.com/apache/kafka/pull/10096#discussion_r573855741



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1236,7 +1243,17 @@ public void assign(Collection<TopicPartition> partitions) {
                 }
 
                 final FetchedRecords<K, V> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
+
+                // We only need to save off the metadata if we are NOT supposed to return early on metadata-only responses
+                if (!longPollExitEarlyOnMetadata) {

Review comment:
       sounds good!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei closed pull request #10096: KAFKA-12268: Make early poll return opt-in

Posted by GitBox <gi...@apache.org>.
vvcephei closed pull request #10096:
URL: https://github.com/apache/kafka/pull/10096


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #10096: KAFKA-12268: Make early poll return opt-in

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #10096:
URL: https://github.com/apache/kafka/pull/10096#issuecomment-788943114


   Replced by https://github.com/apache/kafka/pull/10137


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #10096: KAFKA-12268: Make early poll return opt-in

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #10096:
URL: https://github.com/apache/kafka/pull/10096#issuecomment-776446769


   Hey @rajinisivaram and @chia7712 ,
   
   Here is my proposed fix for https://issues.apache.org/jira/browse/KAFKA-12268 , based on our discussion in https://github.com/apache/kafka/pull/10022 and in the mailing list (https://lists.apache.org/thread.html/ra3a73e74c2bb9768ce33dcf0492e2f0fd2f120044541c6b216a67cca%40%3Cdev.kafka.apache.org%3E)
   
   I don't know if anyone has a preference on the config name itself. I also considered just `enable.early.poll.return.on.metadata=false|true`, but _maybe_ there will be some other behavior we might add as another "mode" in the future. No strong opinion here.
   
   Also, please take a look at the logic in the Consumer. I did my best to make it both efficient and legible, but I'm not sure I achieved either goal.
   
   Thanks for spotting the problem and for the discussion, and thanks in advance for the reviews!
   
   Also, as the original reviewer of the feature, I'd appreciate a review from @guozhangwang if you have time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10096: KAFKA-12268: Make early poll return opt-in

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10096:
URL: https://github.com/apache/kafka/pull/10096#discussion_r573860208



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
##########
@@ -71,6 +71,21 @@
     /** <code>max.poll.interval.ms</code> */
     public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
     private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;
+
+    /** <code>long.poll.mode</code> */
+    public static final String LONG_POLL_RETURN_ON_RECORDS = "return_on_records";
+    public static final String LONG_POLL_RETURN_ON_RESPONSE = "return_on_response";
+    public static final String LONG_POLL_MODE_CONFIG = "long.poll.mode";
+    public static final String LONG_POLL_MODE_DOC = "Whether a call to Consumer#poll(...) should block until" +
+        " records are received (return_on_records), or should return early if a metadata-only response is received" +
+        " from the brokers (return_on_metadata). The 'return_on_records' mode is the default, and it matches prior" +
+        " behavior. The 'return_on_response' mode allows callers to witness topic metadata changes via" +
+        " ConsumerRecords#metadata() as soon as they are received in a fetch response, even if there are no records" +
+        " included in the response. Callers who are only interested in seeing records should leave the default in" +
+        " place, and callers who are interested in maintaining fresh local information about the current lag should" +
+        " enable 'return_on_response'." +
+        " Note that the Consumer metrics are always updated upon receipt of the fetch responses.";

Review comment:
       Having yet another config for something people should not really care about. I think you should revive the KIP thread with the proposed updates and give people a chance to comment on it. Also the KIP should cover the compatibility impact that is behind this update.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10096: KAFKA-12268: Make early poll return opt-in

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10096:
URL: https://github.com/apache/kafka/pull/10096#discussion_r573463351



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -2480,4 +2508,38 @@ String getClientId() {
     boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
         return updateAssignmentMetadataIfNeeded(timer, true);
     }
+
+    /**
+     * Converts the fetched metadata to the public API ConsumerRecords.Metadata.
+     * Also computes overrides from left to the right in the case of multiple args.
+     * Any of the arg maps may be null, in which case, we just skip it.
+     */
+    @SafeVarargs
+    private static Map<TopicPartition, ConsumerRecords.Metadata> extractMetadata(final Map<TopicPartition, FetchedRecords.FetchMetadata>... nullableMetadataMaps) {
+        Map<TopicPartition, ConsumerRecords.Metadata> result = null;
+        for (Map<TopicPartition, FetchedRecords.FetchMetadata> nullableMetadataMap : nullableMetadataMaps) {
+            if (nullableMetadataMap == null || nullableMetadataMap.isEmpty()) {
+                continue;
+            } else {
+                if (result == null) {
+                    result = new HashMap<>(nullableMetadataMap.size());
+                }
+                for (Map.Entry<TopicPartition, FetchedRecords.FetchMetadata> entry : nullableMetadataMap.entrySet()) {
+                    result.put(
+                        entry.getKey(),
+                        new ConsumerRecords.Metadata(

Review comment:
       Not sure why we need to have `FetchedRecords.FetchMetadata`? It is always converted to `ConsumerRecords.Metadata` directly. The `records` in `FetchedRecords` is `ConsumerRecord`. Maybe we can make `FetchedRecords` use `ConsumerRecords.Metadata` also.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10096: KAFKA-12268: Make early poll return opt-in

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10096:
URL: https://github.com/apache/kafka/pull/10096#discussion_r573812058



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
##########
@@ -71,6 +71,21 @@
     /** <code>max.poll.interval.ms</code> */
     public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
     private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;
+
+    /** <code>long.poll.mode</code> */
+    public static final String LONG_POLL_RETURN_ON_RECORDS = "return_on_records";
+    public static final String LONG_POLL_RETURN_ON_RESPONSE = "return_on_response";
+    public static final String LONG_POLL_MODE_CONFIG = "long.poll.mode";
+    public static final String LONG_POLL_MODE_DOC = "Whether a call to Consumer#poll(...) should block until" +
+        " records are received (return_on_records), or should return early if a metadata-only response is received" +
+        " from the brokers (return_on_metadata). The 'return_on_records' mode is the default, and it matches prior" +
+        " behavior. The 'return_on_response' mode allows callers to witness topic metadata changes via" +
+        " ConsumerRecords#metadata() as soon as they are received in a fetch response, even if there are no records" +
+        " included in the response. Callers who are only interested in seeing records should leave the default in" +
+        " place, and callers who are interested in maintaining fresh local information about the current lag should" +
+        " enable 'return_on_response'." +
+        " Note that the Consumer metrics are always updated upon receipt of the fetch responses.";

Review comment:
       In what KIP was this approved?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10096: KAFKA-12268: Make early poll return opt-in

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10096:
URL: https://github.com/apache/kafka/pull/10096#discussion_r573811199



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
##########
@@ -71,6 +71,21 @@
     /** <code>max.poll.interval.ms</code> */
     public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
     private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;
+
+    /** <code>long.poll.mode</code> */
+    public static final String LONG_POLL_RETURN_ON_RECORDS = "return_on_records";
+    public static final String LONG_POLL_RETURN_ON_RESPONSE = "return_on_response";
+    public static final String LONG_POLL_MODE_CONFIG = "long.poll.mode";
+    public static final String LONG_POLL_MODE_DOC = "Whether a call to Consumer#poll(...) should block until" +
+        " records are received (return_on_records), or should return early if a metadata-only response is received" +
+        " from the brokers (return_on_metadata). The 'return_on_records' mode is the default, and it matches prior" +
+        " behavior. The 'return_on_response' mode allows callers to witness topic metadata changes via" +
+        " ConsumerRecords#metadata() as soon as they are received in a fetch response, even if there are no records" +
+        " included in the response. Callers who are only interested in seeing records should leave the default in" +
+        " place, and callers who are interested in maintaining fresh local information about the current lag should" +
+        " enable 'return_on_response'." +
+        " Note that the Consumer metrics are always updated upon receipt of the fetch responses.";

Review comment:
       Hmm, this doesn't seem great.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10096: KAFKA-12268: Make early poll return opt-in

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10096:
URL: https://github.com/apache/kafka/pull/10096#discussion_r573852813



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
##########
@@ -71,6 +71,21 @@
     /** <code>max.poll.interval.ms</code> */
     public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
     private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;
+
+    /** <code>long.poll.mode</code> */
+    public static final String LONG_POLL_RETURN_ON_RECORDS = "return_on_records";
+    public static final String LONG_POLL_RETURN_ON_RESPONSE = "return_on_response";
+    public static final String LONG_POLL_MODE_CONFIG = "long.poll.mode";
+    public static final String LONG_POLL_MODE_DOC = "Whether a call to Consumer#poll(...) should block until" +
+        " records are received (return_on_records), or should return early if a metadata-only response is received" +
+        " from the brokers (return_on_metadata). The 'return_on_records' mode is the default, and it matches prior" +
+        " behavior. The 'return_on_response' mode allows callers to witness topic metadata changes via" +
+        " ConsumerRecords#metadata() as soon as they are received in a fetch response, even if there are no records" +
+        " included in the response. Callers who are only interested in seeing records should leave the default in" +
+        " place, and callers who are interested in maintaining fresh local information about the current lag should" +
+        " enable 'return_on_response'." +
+        " Note that the Consumer metrics are always updated upon receipt of the fetch responses.";

Review comment:
       Thanks @ijuma ,
   
   This is KIP-695. The currently approved version of it includes just a behavior change and no config. The behavior change is that, if you call `Consumer#poll` and there are some completed fetch responses, but none of them have records, we still return a `ConsumerRecords` object with no records so that we can return the new metadata (via `ConsumerRecords#metadata()`.
   
   That behavior change struck @chia7712 and @rajinisivaram as not ok (for reasons discussed in https://lists.apache.org/thread.html/ra3a73e74c2bb9768ce33dcf0492e2f0fd2f120044541c6b216a67cca%40%3Cdev.kafka.apache.org%3E ).
   
   In that thread, we agreed to instead introduce an opt-in config, which is what this doc is about. Which part doesn't seem great to you?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org