You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "kirktrue (via GitHub)" <gi...@apache.org> on 2023/02/16 21:59:54 UTC

[GitHub] [kafka] kirktrue opened a new pull request, #13265: Prototype consumer stubs

kirktrue opened a new pull request, #13265:
URL: https://github.com/apache/kafka/pull/13265

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1109286591


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+

Review Comment:
   we still need subscriptions.subscribe no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13265: Prototype consumer stubs

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13265:
URL: https://github.com/apache/kafka/pull/13265#issuecomment-1440786040

   I feel the real challenge here is to determine the timing to synchronize the background and client state. I feel we should do that deterministically otherwise, it is hard to provide a contract to the user. Anyways here are a few of my notes:
   
   1. What should the user expect: I think these are a few scenarios
    - `subscribe()` -> `subscriptions()`
    - `assign()` -> `assignments()`
    - `seek()` -> `position()`
    - `seek()` -> poll() (something returned) -> `position()`
   2. Do we all agree to only drain the queue in `poll()` ? There are a few things to consider
    - When and where to throw the CommitFailureException() (I think now it can happen everywhere when user invoke APIs)
    - When and where to invoke the callbacks (commit and rebalance)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue closed pull request #13265: Prototype consumer stubs

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue closed pull request #13265: Prototype consumer stubs
URL: https://github.com/apache/kafka/pull/13265


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1115056524


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {

Review Comment:
   Oh my mistake, entirely forgot about there's no rebalancing after the assignment().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1115068319


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)

Review Comment:
   Got it, thanks for clarifying. I layout my understanding and expectation here: 
   
   1. `assign()` -> `assignment()` : return the latest assigned partitions.
   2. `subscribe()` -> `assignment()` : It would still be the old assignment because poll hasn't been called yet, so nothing would have been updated
   3. `subscribed()` -> poll() [rebalance finished, callback triggered] -> `assignment()`: send back the latest assignment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1114928370


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {

Review Comment:
   I feel we need an explicit synchronization barrier to ensure state changes happen in a deterministic sequence.  I mentioned this idea in our previous meeting, but I just want to bring it up again for discussion: (feel free to strike down this idea)
   
   Is it reasonable to ask users to invoke poll() after any sort of assignment to trigger the rebalance? I think we could use poll() as an explicit barrier to synchronize both threads.
   
   So back to your example,
   1. User updates assignment via `assign()`
   2. Read calls (`assignments()`) here will still returns the previous state
   3.  client invoke poll() - update client and background state and triggers rebalance. Rebalance callback might be triggered here.
   4. `assignments()` returns the updated version of the assignment. Invalid fetches will be dropped here.
   
   I think it kind of breaks the current contract, as `assign()` -> `assignment()` should already reflect to the current assignment.  But I do think regardless of how we handle this, there will be some kind of behavioral changes as the threading model is now asynchronous, which kind of implies data update is expected to be asynchronous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1114930390


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {

Review Comment:
   Afterall, I think it is a worse user experience if the assignment call cannot yield deterministic results. Even though we will need a KIP for this behavior change, I think it kind of provides the guarantee to the user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1109283896


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##########
@@ -19,23 +19,35 @@
 /**
  * This is the abstract definition of the events created by the KafkaConsumer API
  */
-abstract public class ApplicationEvent {
-    public final Type type;
+public abstract class ApplicationEvent {
+
+    public enum Type {
+        NOOP,
+        ASSIGN_PARTITIONS,
+        COMMIT_ASYNC,

Review Comment:
   I wonder if we need to treat async and sync commit differently.  I think they look the same from the background thread's prospective.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1115047665


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {

Review Comment:
   In the `assign()` -> `assignment()` example above, there is no "rebalance" triggered and hence there's no rebalance listener triggered either. The rebalance listener is only relevant with the `subscribe()` scenarios.
   
   So just to summarize all the scenarios here to avoid confusions between us :)
   
   1. `assign()`: the followed `assignment()`  should always reflect the latest partition list from the previous `assign()`, and also the followed `poll()` should return data from the latest partition list only (i.e. the background thread, upon getting this new partition list, should cleanup the buffered records accordingly, and the followed `poll` call should return empty data if no new data is retrieved within the poll timeout yet).
   2. `subscribe()`: the followed `assignment()` could potentially still return current assignment which is inconsistent with the `subscribe()` --- this is the case today, but as I mentioned above I think could be open for discussion whether we can to change  --- or return the new assignment after the rebalance if enough time has elapsed so that the rebalance is done; the followed `poll()` though should also only return data from the latest partition list as a result of the new subscription.
   3. `seek/seekToB/E`: the followed `position` should reflect the newly set position, and also the followed `poll` should only return data from the new seeking position.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1114851962


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)

Review Comment:
   sorry, i think i'm not getting this part `for the former, the source of truth is on the background thread` (I assume you mean by subscription).  The current expectation is, as previously discussed, subscribe() -> subscriptions() should yield the most up to date subscription.  Changing the ownership will break this assumption no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1114721314


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>

Review Comment:
   Where would this list of partitions from? If it is from the current assignment, then the background thread would have this as well right? Or is it only for the manual-assign scenario?
   
   In general I think the synchronization between foreground and background thread on the assignment would be different for `subscribe` and manually `assign` scenarios: for the former case, the background thread would have the up-to-date assignment from rebalances, and the foreground may have a slightly stale one, while for the latter case it's the opposite, as background thread may have staled assignment.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)
+        //    Update the assignments in the consumer coordinator.
+        //    (hand waving) something about position validation
+        //    Fetch committed offsets, if needed
+        //    Reset positions & offsets
+        // 2. Collect and return any previously loaded fetches
+        // 3. Submit fetch requests for any ready partitions, including any we might have collected
+        Fetcher.CompletedFetch completedFetch = eventHandler.addAndGet(new FetchRecordsEvent(), timer);
+
+        // We return the raw fetch records from the background thread to the foreground thread so that the

Review Comment:
   Just to clarify, the foreground thread would just try to retrieve data from the raw records buffer that the background thread keep accumulating to, right? Of course the raw records may be retrieved for old assignment, in which case they need to be trimmed still by the foreground thread.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {

Review Comment:
   While I agree this func should be non-blocking, I still want to clarify the ordering restrictions of this func when used together with others like `poll`. For example, assume the following events happened at the background thread side with `subscribe` scenarios:
   
   1) Currently assigned with partition A and B.
   2) Rebalance happened, and the new assignment updated to C and D.
   3) Sent a notification via the foreground queue about the new assignment, to let the foreground thread trigger the corresponding rebalance listener.
   4) Dropped all records in the raw records buffer for partition A and B, fetched new data from partition C and D.
   
   
   Now suppose the foreground thread calls `assignment` between time 2) and 3), it would still return `{A, B}`, and then later when it calls `poll` between time 3) and 4), it should immediately trigger the rebalance listener, and then records from C and D can be returned later.
   
   If the foreground thread calls `assignment` between time 3) and 4), the foreground thread already knows about the new assignment so it could return `{C, D}`. HOWEVER, since it has not yet calls `poll` it would not trigger the rebalance listener --- this is because in docs we state that
   
   ```
   This callback will only execute in the user thread as part of the {@link Consumer#poll(java.time.Duration) poll(long)} call whenever partition assignment changes.
   ```
   
   Which is a bit weird since it is out-of-ordering from the caller's perspective: when I call `assignment()` and then `poll()`, I would first already get the new assignment, but the listener indicating that assignment as changed via `onPartitionsRevoked/Assigned` would only come later in the `poll`.
   
   While searching on the doc I did not find anywhere that we state this ordering would be guaranteed, I concern there are users who's assuming this ordering in their app development.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {

Review Comment:
   If we want to still respect the ordering of assignment changes from caller's perspective here, I think we'd consider 1) let the `assignment()` call still return the stale assignment before the rebalance listener is triggered, to be consistent with the current behavior; 2) change the `listener` API that this may also be triggered in other funcs than `poll`, e.g. in `assignment` as well.
   
   Personally I'd prefer we do 1), but would love to hear your thoughts as well @hachikuji @philipnee



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)
+        //    Update the assignments in the consumer coordinator.
+        //    (hand waving) something about position validation
+        //    Fetch committed offsets, if needed
+        //    Reset positions & offsets
+        // 2. Collect and return any previously loaded fetches
+        // 3. Submit fetch requests for any ready partitions, including any we might have collected
+        Fetcher.CompletedFetch completedFetch = eventHandler.addAndGet(new FetchRecordsEvent(), timer);
+
+        // We return the raw fetch records from the background thread to the foreground thread so that the
+        // potentially expensive task of deserializing the record won't stall out our background thread.
+        //for (SerializedRecordWrapper recordWrapper : wrappers) {
+        //    TopicPartition tp = recordWrapper.topicPartition();
+        //
+        //    // Make sure that this topic partition is still on our set of subscribed topics/assigned partitions,
+        //    // as this might have changed since the fetcher submitted the fetch request.
+        //    if (isRelevant(tp)) {
+        //        ConsumerRecord<K, V> record = parseRecord(recordWrapper);
+        //        List<ConsumerRecord<K, V>> list = records.computeIfAbsent(tp, __ -> new ArrayList<>());
+        //        list.add(record);
+        //    }
+        //}
+
+        return !records.isEmpty() ? new ConsumerRecords<>(records) : ConsumerRecords.empty();
+    }
+
+    /**
+     * Mostly stolen from Fetcher.CompletedFetch's parseRecord...
+     */
+    private ConsumerRecord<K, V> parseRecord(Record record,
+                                             TopicPartition partition,
+                                             TimestampType timestampType,
+                                             Optional<Integer> leaderEpoch) {
+        try {
+            long offset = record.offset();
+            long timestamp = record.timestamp();
+            Headers headers = new RecordHeaders(record.headers());
+            ByteBuffer keyBytes = record.key();
+            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
+            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
+            ByteBuffer valueBytes = record.value();
+            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
+            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
+            return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
+                    timestamp, timestampType,
+                    keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
+                    valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
+                    key, value, headers, leaderEpoch);
+        } catch (RuntimeException e) {
+            throw new RecordDeserializationException(partition, record.offset(),
+                    "Error deserializing key/value for partition " + partition +
+                            " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
+        }
+    }
+
+    @Override
+    public void commitSync() {
+        commitSync(Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public void commitSync(Duration timeout) {
+        commitSync(subscriptions.allConsumed(), timeout);
+    }
+
+    @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Metadata will update its 'last seen' epoch, if newer
+        // 2. Invoke the coordinator commitOffsetsSync logic
+        eventHandler.addAndGet(new CommitSyncEvent(offsets), time.timer(timeout));
+    }
+
+    @Override
+    public void commitAsync() {
+        commitAsync(null);
+    }
+
+    @Override
+    public void commitAsync(OffsetCommitCallback callback) {
+        commitAsync(subscriptions.allConsumed(), callback);
+    }
+
+    @Override
+    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Metadata will update its 'last seen' epoch, if newer
+        // 2. Invoke the coordinator commitOffsetsAsync logic
+        eventHandler.add(new CommitAsyncEvent(offsets, callback));
+    }
+
+    @Override
+    public void seek(TopicPartition partition, long offset) {
+        // TODO: This needs to be propagated to the background thread, right?

Review Comment:
   Yes I think so, and also the next `poll` should respect this call, hence the `poll` call itself should block until the background thread has processed this event and updated its fetch position accordingly, also the returned records from the raw records buffer should be trimmed based on the new position as well.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)

Review Comment:
   Again I think we need to separate the scenarios of `subscribe` v.s. manual `assignment`: for the former, the source of truth is on the background thread, and foreground thread would be asynchronously brought up to date via the queues; for the latter, the foreground would have the source of truth and the background thread would be updated asynchronously.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)
+        //    Update the assignments in the consumer coordinator.
+        //    (hand waving) something about position validation
+        //    Fetch committed offsets, if needed
+        //    Reset positions & offsets
+        // 2. Collect and return any previously loaded fetches
+        // 3. Submit fetch requests for any ready partitions, including any we might have collected
+        Fetcher.CompletedFetch completedFetch = eventHandler.addAndGet(new FetchRecordsEvent(), timer);
+
+        // We return the raw fetch records from the background thread to the foreground thread so that the
+        // potentially expensive task of deserializing the record won't stall out our background thread.
+        //for (SerializedRecordWrapper recordWrapper : wrappers) {
+        //    TopicPartition tp = recordWrapper.topicPartition();
+        //
+        //    // Make sure that this topic partition is still on our set of subscribed topics/assigned partitions,
+        //    // as this might have changed since the fetcher submitted the fetch request.
+        //    if (isRelevant(tp)) {
+        //        ConsumerRecord<K, V> record = parseRecord(recordWrapper);
+        //        List<ConsumerRecord<K, V>> list = records.computeIfAbsent(tp, __ -> new ArrayList<>());
+        //        list.add(record);
+        //    }
+        //}
+
+        return !records.isEmpty() ? new ConsumerRecords<>(records) : ConsumerRecords.empty();
+    }
+
+    /**
+     * Mostly stolen from Fetcher.CompletedFetch's parseRecord...
+     */
+    private ConsumerRecord<K, V> parseRecord(Record record,
+                                             TopicPartition partition,
+                                             TimestampType timestampType,
+                                             Optional<Integer> leaderEpoch) {
+        try {
+            long offset = record.offset();
+            long timestamp = record.timestamp();
+            Headers headers = new RecordHeaders(record.headers());
+            ByteBuffer keyBytes = record.key();
+            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
+            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
+            ByteBuffer valueBytes = record.value();
+            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
+            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
+            return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
+                    timestamp, timestampType,
+                    keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
+                    valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
+                    key, value, headers, leaderEpoch);
+        } catch (RuntimeException e) {
+            throw new RecordDeserializationException(partition, record.offset(),
+                    "Error deserializing key/value for partition " + partition +
+                            " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
+        }
+    }
+
+    @Override
+    public void commitSync() {
+        commitSync(Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public void commitSync(Duration timeout) {
+        commitSync(subscriptions.allConsumed(), timeout);
+    }
+
+    @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Metadata will update its 'last seen' epoch, if newer
+        // 2. Invoke the coordinator commitOffsetsSync logic
+        eventHandler.addAndGet(new CommitSyncEvent(offsets), time.timer(timeout));
+    }
+
+    @Override
+    public void commitAsync() {
+        commitAsync(null);
+    }
+
+    @Override
+    public void commitAsync(OffsetCommitCallback callback) {
+        commitAsync(subscriptions.allConsumed(), callback);
+    }
+
+    @Override
+    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Metadata will update its 'last seen' epoch, if newer
+        // 2. Invoke the coordinator commitOffsetsAsync logic
+        eventHandler.add(new CommitAsyncEvent(offsets, callback));
+    }
+
+    @Override
+    public void seek(TopicPartition partition, long offset) {
+        // TODO: This needs to be propagated to the background thread, right?
+        subscriptions.seek(partition, offset);
+    }
+
+    @Override
+    public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
+        // TODO: This needs to be propagated to the background thread, right?
+    }
+
+    @Override
+    public void seekToBeginning(Collection<TopicPartition> partitions) {
+
+    }
+
+    @Override
+    public void seekToEnd(Collection<TopicPartition> partitions) {
+
+    }
+
+    @Override
+    public long position(TopicPartition partition) {

Review Comment:
   Again I'd like to emphasize the ordering of this function with other func here. And just to demo here's an example:
   
   ```
   0: buffered records from offset 0 to offset 10 (exclusive), no acked position offset, fetch offset 10 (i.e. for the next fetch request).
   1: Foreground **poll** returned records [0, 5). **Buffered** [5, 10), acked **position** offset 0, **fetch** offset 10.
   2: Background gets records [10, 20) from fetch response. **Buffered** [5, 20), acked **position** offset 0, **fetch** offset 20.
   3: Foreground **position** call returns 0 still.
   4: Foreground **poll** returned records [5, 15). **Buffered** [15, 20), acked **position** offset 5, **fetch** offset 20.
   5: Foreground **position** call returns 5.
   6: Foreground **seekToBeginning**. **Buffered** [15, 20), acked **position** offset 0, **fetch** offset 20.
   7: Foreground **position** call returns 0.
   8: Foreground **poll** calls again, here cannot return any data yet since the buffered records does not contains any for the starting position.
   9: Background clears the buffered records, reset its **fetch** offset to 0 and send the next fetch request.
   10: Background finally gets some data from the fetch response, say [0, 15), before this is done all **poll** calls from foreground should return nothing.
   11: The next **poll** call could then return records [0, 10), acked position is still 0, and fetch offset is 15.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1109264154


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)

Review Comment:
   client thread will need to send an explicit ack to the background to ack the previously consumed record for autocommit.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)
+        //    Update the assignments in the consumer coordinator.
+        //    (hand waving) something about position validation
+        //    Fetch committed offsets, if needed
+        //    Reset positions & offsets
+        // 2. Collect and return any previously loaded fetches
+        // 3. Submit fetch requests for any ready partitions, including any we might have collected
+        Fetcher.CompletedFetch completedFetch = eventHandler.addAndGet(new FetchRecordsEvent(), timer);

Review Comment:
   I feel we should split it into two parts:
   1. handler.drain() - like we drain all events in the queue and process them at once.  Including the callback triggers, errors, etc..
   2. if (hasFetches()) return ...



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)
+        //    Update the assignments in the consumer coordinator.
+        //    (hand waving) something about position validation
+        //    Fetch committed offsets, if needed
+        //    Reset positions & offsets
+        // 2. Collect and return any previously loaded fetches

Review Comment:
   i think client thread would need to perform the collectFetches stuff.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {

Review Comment:
   I guess this is kind of a blocking operation, so we kind of need a do...while() to wait for sufficient fetches to return. In fact, I wonder if we could just return a completableFuture.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13265: Prototype consumer stubs

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13265:
URL: https://github.com/apache/kafka/pull/13265#issuecomment-1440933102

   Thanks @philipnee , I left a comment to just give my 2c regarding your question 1) above.
   
   For your question 2), I think we should prefer to be consistent with the current guarantees if it's already stated in the public APIs unless there's good rationale to change it (in which case we'd need very vocally communicate it as a breaking change in the release). For example:
   
   1) We documented that rebalance listener callbacks are only related to `subscribe` scenarios, and would only be triggered within `poll()`, and only at which time the assignment could be changed.
   2) We also documented that the async commit listener callbacks are only triggered within `poll()`.
   3) In our javadocs, the CommitFailedException would only be thrown from the `commitSync` functions.
   
   Putting all those together, I think it means `poll()` call should drain the queues since if there's any events requiring any callbacks to be triggered, they should be triggered in that call; 
   
   For `commitSync`, technically it would need to wait for the corresponding commit response event, but since there's only a single queue, it means we would still need to keep polling that queue until the event is received.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1109260958


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Timer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public abstract class CompletableApplicationEvent<T> extends ApplicationEvent {

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1115037535


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)

Review Comment:
   Sorry I was not clear before, by `source of truth` I meant the assignment, i.e. list of partition infos, not the subscription, i.e. list of topics of topic-regex. When user called `subscribe()`, and then followed `subscription()` should definitely reflect the newly updated subscription info from `subscribe()`, but the followed `assignment()` would only be updated when the rebalance is completed, in which case it will be updated by the background thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1114928370


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {

Review Comment:
   I feel we need an explicit synchronization barrier to ensure state changes happen in a deterministic sequence.  I mentioned this idea in our previous meeting, but I just want to bring it up again for discussion: (feel free to strike down this idea)
   
   Is it reasonable to ask users to invoke poll() after any sort of assignment to trigger the rebalance? I think we could use poll() as an explicit barrier to synchronize both threads.
   
   So back to your example,
   1. User updates assignment via `assign()`
   2. Read calls (`assignments()`) here will still returns the previous state
   3.  client invoke poll() - update client and background state and triggers rebalance. Rebalance callback triggered.
   4. `assignments()` returns the updated version of the assignment. Invalid fetches will be dropped here.
   
   I think it kind of breaks the current contract, as `assign()` -> `assignment()` should already reflect to the current assignment.  But I do think regardless of how we handle this, there will be some kind of behavioral changes as the threading model is now asynchronous, which kind of implies data update is expected to be asynchronous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] github-actions[bot] commented on pull request #13265: Prototype consumer stubs

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13265:
URL: https://github.com/apache/kafka/pull/13265#issuecomment-1585813172

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch)
   If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org