You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "guozhangwang (via GitHub)" <gi...@apache.org> on 2023/02/22 18:34:58 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #13265: Prototype consumer stubs

guozhangwang commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1114721314


##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>

Review Comment:
   Where would this list of partitions from? If it is from the current assignment, then the background thread would have this as well right? Or is it only for the manual-assign scenario?
   
   In general I think the synchronization between foreground and background thread on the assignment would be different for `subscribe` and manually `assign` scenarios: for the former case, the background thread would have the up-to-date assignment from rebalances, and the foreground may have a slightly stale one, while for the latter case it's the opposite, as background thread may have staled assignment.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)
+        //    Update the assignments in the consumer coordinator.
+        //    (hand waving) something about position validation
+        //    Fetch committed offsets, if needed
+        //    Reset positions & offsets
+        // 2. Collect and return any previously loaded fetches
+        // 3. Submit fetch requests for any ready partitions, including any we might have collected
+        Fetcher.CompletedFetch completedFetch = eventHandler.addAndGet(new FetchRecordsEvent(), timer);
+
+        // We return the raw fetch records from the background thread to the foreground thread so that the

Review Comment:
   Just to clarify, the foreground thread would just try to retrieve data from the raw records buffer that the background thread keep accumulating to, right? Of course the raw records may be retrieved for old assignment, in which case they need to be trimmed still by the foreground thread.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {

Review Comment:
   While I agree this func should be non-blocking, I still want to clarify the ordering restrictions of this func when used together with others like `poll`. For example, assume the following events happened at the background thread side with `subscribe` scenarios:
   
   1) Currently assigned with partition A and B.
   2) Rebalance happened, and the new assignment updated to C and D.
   3) Sent a notification via the foreground queue about the new assignment, to let the foreground thread trigger the corresponding rebalance listener.
   4) Dropped all records in the raw records buffer for partition A and B, fetched new data from partition C and D.
   
   
   Now suppose the foreground thread calls `assignment` between time 2) and 3), it would still return `{A, B}`, and then later when it calls `poll` between time 3) and 4), it should immediately trigger the rebalance listener, and then records from C and D can be returned later.
   
   If the foreground thread calls `assignment` between time 3) and 4), the foreground thread already knows about the new assignment so it could return `{C, D}`. HOWEVER, since it has not yet calls `poll` it would not trigger the rebalance listener --- this is because in docs we state that
   
   ```
   This callback will only execute in the user thread as part of the {@link Consumer#poll(java.time.Duration) poll(long)} call whenever partition assignment changes.
   ```
   
   Which is a bit weird since it is out-of-ordering from the caller's perspective: when I call `assignment()` and then `poll()`, I would first already get the new assignment, but the listener indicating that assignment as changed via `onPartitionsRevoked/Assigned` would only come later in the `poll`.
   
   While searching on the doc I did not find anywhere that we state this ordering would be guaranteed, I concern there are users who's assuming this ordering in their app development.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {

Review Comment:
   If we want to still respect the ordering of assignment changes from caller's perspective here, I think we'd consider 1) let the `assignment()` call still return the stale assignment before the rebalance listener is triggered, to be consistent with the current behavior; 2) change the `listener` API that this may also be triggered in other funcs than `poll`, e.g. in `assignment` as well.
   
   Personally I'd prefer we do 1), but would love to hear your thoughts as well @hachikuji @philipnee



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)
+        //    Update the assignments in the consumer coordinator.
+        //    (hand waving) something about position validation
+        //    Fetch committed offsets, if needed
+        //    Reset positions & offsets
+        // 2. Collect and return any previously loaded fetches
+        // 3. Submit fetch requests for any ready partitions, including any we might have collected
+        Fetcher.CompletedFetch completedFetch = eventHandler.addAndGet(new FetchRecordsEvent(), timer);
+
+        // We return the raw fetch records from the background thread to the foreground thread so that the
+        // potentially expensive task of deserializing the record won't stall out our background thread.
+        //for (SerializedRecordWrapper recordWrapper : wrappers) {
+        //    TopicPartition tp = recordWrapper.topicPartition();
+        //
+        //    // Make sure that this topic partition is still on our set of subscribed topics/assigned partitions,
+        //    // as this might have changed since the fetcher submitted the fetch request.
+        //    if (isRelevant(tp)) {
+        //        ConsumerRecord<K, V> record = parseRecord(recordWrapper);
+        //        List<ConsumerRecord<K, V>> list = records.computeIfAbsent(tp, __ -> new ArrayList<>());
+        //        list.add(record);
+        //    }
+        //}
+
+        return !records.isEmpty() ? new ConsumerRecords<>(records) : ConsumerRecords.empty();
+    }
+
+    /**
+     * Mostly stolen from Fetcher.CompletedFetch's parseRecord...
+     */
+    private ConsumerRecord<K, V> parseRecord(Record record,
+                                             TopicPartition partition,
+                                             TimestampType timestampType,
+                                             Optional<Integer> leaderEpoch) {
+        try {
+            long offset = record.offset();
+            long timestamp = record.timestamp();
+            Headers headers = new RecordHeaders(record.headers());
+            ByteBuffer keyBytes = record.key();
+            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
+            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
+            ByteBuffer valueBytes = record.value();
+            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
+            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
+            return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
+                    timestamp, timestampType,
+                    keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
+                    valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
+                    key, value, headers, leaderEpoch);
+        } catch (RuntimeException e) {
+            throw new RecordDeserializationException(partition, record.offset(),
+                    "Error deserializing key/value for partition " + partition +
+                            " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
+        }
+    }
+
+    @Override
+    public void commitSync() {
+        commitSync(Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public void commitSync(Duration timeout) {
+        commitSync(subscriptions.allConsumed(), timeout);
+    }
+
+    @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Metadata will update its 'last seen' epoch, if newer
+        // 2. Invoke the coordinator commitOffsetsSync logic
+        eventHandler.addAndGet(new CommitSyncEvent(offsets), time.timer(timeout));
+    }
+
+    @Override
+    public void commitAsync() {
+        commitAsync(null);
+    }
+
+    @Override
+    public void commitAsync(OffsetCommitCallback callback) {
+        commitAsync(subscriptions.allConsumed(), callback);
+    }
+
+    @Override
+    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Metadata will update its 'last seen' epoch, if newer
+        // 2. Invoke the coordinator commitOffsetsAsync logic
+        eventHandler.add(new CommitAsyncEvent(offsets, callback));
+    }
+
+    @Override
+    public void seek(TopicPartition partition, long offset) {
+        // TODO: This needs to be propagated to the background thread, right?

Review Comment:
   Yes I think so, and also the next `poll` should respect this call, hence the `poll` call itself should block until the background thread has processed this event and updated its fetch position accordingly, also the returned records from the raw records buffer should be trimmed based on the new position as well.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)

Review Comment:
   Again I think we need to separate the scenarios of `subscribe` v.s. manual `assignment`: for the former, the source of truth is on the background thread, and foreground thread would be asynchronously brought up to date via the queues; for the latter, the foreground would have the source of truth and the background thread would be updated asynchronously.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link Consumer} serves as a <i>wireframe</i> or
+ * <i>sketch</i>, showing the interaction between the foreground and background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * <ol>
+ *     <li>Does this method block?</li>
+ *     <li>Does this method interact with the background thread?</li>
+ *     <li>If yes, what data is passed as input to the background thread?</li>
+ *     <li>If yes, what data is returned as output from the background thread?</li>
+ * </ol>
+ *
+ * @param <K> Key
+ * @param <V> Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer<K, V> implements Consumer<K, V> {
+
+    /**
+     * These instance variables are intentionally left unassigned, to avoid clutter...
+     */
+    private Time time;
+
+    private EventHandler eventHandler;
+
+    private SubscriptionState subscriptions;
+
+    private Deserializer<K> keyDeserializer;
+
+    private Deserializer<V> valueDeserializer;
+
+    private long defaultApiTimeoutMs;
+
+    private List<ConsumerPartitionAssignor> assignors;
+
+    private Optional<String> groupId;
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<TopicPartition> assignment() {
+        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>No</li>
+     *     <li><i>n/a</i></li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public Set<String> subscription() {
+        return Collections.unmodifiableSet(subscriptions.subscription());
+    }
+
+    /**
+     * @see #subscribe(Collection, ConsumerRebalanceListener)
+     */
+    @Override
+    public void subscribe(Collection<String> topics) {
+        subscribe(topics, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>Topic list and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (topics == null)
+            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+        if (topics.isEmpty()) {
+            // treat subscribing to empty topic list as the same as unsubscribing
+            this.unsubscribe();
+        } else {
+            for (String topic : topics) {
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+            }
+
+            throwIfNoAssignorsConfigured();
+            eventHandler.add(new SubscribeTopicsEvent(topics, callback));
+        }
+    }
+
+    /**
+     * @see #subscribe(Pattern, ConsumerRebalanceListener)
+     */    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes, except in cases of bad input data</li>
+     *     <li>{@link Pattern} and {@link ConsumerRebalanceListener}</li>
+     *     <li><i>n/a</i></li>
+     * </ol>
+     */
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
+        maybeThrowInvalidGroupIdException();
+        if (pattern == null || pattern.toString().equals(""))
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+                    "null" : "empty"));
+
+        throwIfNoAssignorsConfigured();
+        eventHandler.add(new SubscribePatternEvent(pattern, callback));
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>None</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void unsubscribe() {
+        eventHandler.add(new UnsubscribeEvent());
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>No</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    @Override
+    public void assign(Collection<TopicPartition> partitions) {
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+        } else if (partitions.isEmpty()) {
+            this.unsubscribe();
+        } else {
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (Utils.isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+            }
+
+            eventHandler.add(new AssignPartitionsEvent(partitions));
+        }
+    }
+
+    @Deprecated
+    @Override
+    public ConsumerRecords<K, V> poll(final long timeoutMs) {
+        return poll(time.timer(timeoutMs), false);
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(Duration timeout) {
+        return poll(time.timer(timeout), true);
+    }
+
+    /**
+     * Answers to the above questions:
+     *
+     * <ol>
+     *     <li>Yes</li>
+     *     <li>Yes</li>
+     *     <li>List of {@link TopicPartition partitions}</li>
+     *     <li>None</li>
+     * </ol>
+     */
+    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Execute the 'update assignment metadata' logic (which I don't really understand at all, yet...)
+        //    Update the assignments in the consumer coordinator.
+        //    (hand waving) something about position validation
+        //    Fetch committed offsets, if needed
+        //    Reset positions & offsets
+        // 2. Collect and return any previously loaded fetches
+        // 3. Submit fetch requests for any ready partitions, including any we might have collected
+        Fetcher.CompletedFetch completedFetch = eventHandler.addAndGet(new FetchRecordsEvent(), timer);
+
+        // We return the raw fetch records from the background thread to the foreground thread so that the
+        // potentially expensive task of deserializing the record won't stall out our background thread.
+        //for (SerializedRecordWrapper recordWrapper : wrappers) {
+        //    TopicPartition tp = recordWrapper.topicPartition();
+        //
+        //    // Make sure that this topic partition is still on our set of subscribed topics/assigned partitions,
+        //    // as this might have changed since the fetcher submitted the fetch request.
+        //    if (isRelevant(tp)) {
+        //        ConsumerRecord<K, V> record = parseRecord(recordWrapper);
+        //        List<ConsumerRecord<K, V>> list = records.computeIfAbsent(tp, __ -> new ArrayList<>());
+        //        list.add(record);
+        //    }
+        //}
+
+        return !records.isEmpty() ? new ConsumerRecords<>(records) : ConsumerRecords.empty();
+    }
+
+    /**
+     * Mostly stolen from Fetcher.CompletedFetch's parseRecord...
+     */
+    private ConsumerRecord<K, V> parseRecord(Record record,
+                                             TopicPartition partition,
+                                             TimestampType timestampType,
+                                             Optional<Integer> leaderEpoch) {
+        try {
+            long offset = record.offset();
+            long timestamp = record.timestamp();
+            Headers headers = new RecordHeaders(record.headers());
+            ByteBuffer keyBytes = record.key();
+            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
+            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
+            ByteBuffer valueBytes = record.value();
+            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
+            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
+            return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
+                    timestamp, timestampType,
+                    keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
+                    valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
+                    key, value, headers, leaderEpoch);
+        } catch (RuntimeException e) {
+            throw new RecordDeserializationException(partition, record.offset(),
+                    "Error deserializing key/value for partition " + partition +
+                            " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
+        }
+    }
+
+    @Override
+    public void commitSync() {
+        commitSync(Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public void commitSync(Duration timeout) {
+        commitSync(subscriptions.allConsumed(), timeout);
+    }
+
+    @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
+    }
+
+    @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Metadata will update its 'last seen' epoch, if newer
+        // 2. Invoke the coordinator commitOffsetsSync logic
+        eventHandler.addAndGet(new CommitSyncEvent(offsets), time.timer(timeout));
+    }
+
+    @Override
+    public void commitAsync() {
+        commitAsync(null);
+    }
+
+    @Override
+    public void commitAsync(OffsetCommitCallback callback) {
+        commitAsync(subscriptions.allConsumed(), callback);
+    }
+
+    @Override
+    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+        // The background thread will do the following when it receives this event:
+        //
+        // 1. Metadata will update its 'last seen' epoch, if newer
+        // 2. Invoke the coordinator commitOffsetsAsync logic
+        eventHandler.add(new CommitAsyncEvent(offsets, callback));
+    }
+
+    @Override
+    public void seek(TopicPartition partition, long offset) {
+        // TODO: This needs to be propagated to the background thread, right?
+        subscriptions.seek(partition, offset);
+    }
+
+    @Override
+    public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
+        // TODO: This needs to be propagated to the background thread, right?
+    }
+
+    @Override
+    public void seekToBeginning(Collection<TopicPartition> partitions) {
+
+    }
+
+    @Override
+    public void seekToEnd(Collection<TopicPartition> partitions) {
+
+    }
+
+    @Override
+    public long position(TopicPartition partition) {

Review Comment:
   Again I'd like to emphasize the ordering of this function with other func here. And just to demo here's an example:
   
   ```
   0: buffered records from offset 0 to offset 10 (exclusive), no acked position offset, fetch offset 10 (i.e. for the next fetch request).
   1: Foreground **poll** returned records [0, 5). **Buffered** [5, 10), acked **position** offset 0, **fetch** offset 10.
   2: Background gets records [10, 20) from fetch response. **Buffered** [5, 20), acked **position** offset 0, **fetch** offset 20.
   3: Foreground **position** call returns 0 still.
   4: Foreground **poll** returned records [5, 15). **Buffered** [15, 20), acked **position** offset 5, **fetch** offset 20.
   5: Foreground **position** call returns 5.
   6: Foreground **seekToBeginning**. **Buffered** [15, 20), acked **position** offset 0, **fetch** offset 20.
   7: Foreground **position** call returns 0.
   8: Foreground **poll** calls again, here cannot return any data yet since the buffered records does not contains any for the starting position.
   9: Background clears the buffered records, reset its **fetch** offset to 0 and send the next fetch request.
   10: Background finally gets some data from the fetch response, say [0, 15), before this is done all **poll** calls from foreground should return nothing.
   11: The next **poll** call could then return records [0, 10), acked position is still 0, and fetch offset is 15.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org