You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/30 00:34:53 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1035410111


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+    }
+
+    public List<ClientResponse> poll(Timer timer, boolean disableWakeup) {
+        if (!disableWakeup) {
+            // trigger wakeups after checking for disconnects so that the callbacks will be ready
+            // to be fired on the next call to poll()
+            maybeTriggerWakeup();
+        }
+
+        trySend();
+        return this.client.poll(timer.timeoutMs(), time.milliseconds());
+    }
+
+    private void trySend() {
+        while (unsentRequests.size() > 0) {
+            UnsentRequest unsent = unsentRequests.poll();
+            if (unsent.timer.isExpired()) {
+                // TODO: expired request should be marked
+                unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException(
+                        "Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms.")));
+                continue;
+            }
+
+            doSend(unsent);
+        }
+    }
+
+    static boolean isReady(KafkaClient client, Node node, long currentTime) {
+        client.poll(0, currentTime);
+        return client.isReady(node, currentTime);
+    }
+
+    public void doSend(UnsentRequest r) {
+        long now = time.milliseconds();
+        Node node = r.node.orElse(client.leastLoadedNode(now));

Review Comment:
   `leastLoadedNode` may return null if no nodes are available. We should retry later in that case.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private Node coordinator;

Review Comment:
   nit: it is conventional to have all of the `final` fields listed together.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private Node coordinator;
+    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+    private final ExponentialBackoff exponentialBackoff;
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while
+    private final CoordinatorRequestState coordinatorRequestState;
+
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    public CoordinatorManager(final Time time,
+                              final LogContext logContext,
+                              final ConsumerConfig config,
+                              final BlockingQueue<BackgroundEvent> backgroundEventQueue,
+                              final Optional<String> groupId,
+                              final long rebalanceTimeoutMs) {
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.backgroundEventQueue = backgroundEventQueue;
+        this.exponentialBackoff = new ExponentialBackoff(
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                RECONNECT_BACKOFF_EXP_BASE,
+                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                RECONNECT_BACKOFF_JITTER);
+        this.coordinatorRequestState = new CoordinatorRequestState();
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    /**
+     * Returns a non-empty UnsentRequest we need send a FindCoordinatorRequest. These conditions are:
+     * 1. The request has not been sent
+     * 2. If the previous request failed, and the retryBackoff has expired
+     * @return Optional UnsentRequest.  Empty if we are not allowed to send a request.
+     */
+    public Optional<NetworkClientDelegate.UnsentRequest> tryFindCoordinator() {
+        if (coordinatorRequestState.lastSentMs == -1) {
+            // no request has been sent
+            return Optional.of(
+                    new NetworkClientDelegate.UnsentRequest(
+                            this.time.timer(requestTimeoutMs),
+                            getFindCoordinatorRequest(),
+                            new FindCoordinatorRequestHandler()));
+        }
+
+        if (coordinatorRequestState.lastReceivedMs == -1 ||
+                coordinatorRequestState.lastReceivedMs < coordinatorRequestState.lastSentMs) {
+            // there is an inflight request
+            return Optional.empty();
+        }
+
+        if (!coordinatorRequestState.requestBackoffExpired()) {
+            // retryBackoff
+            return Optional.empty();
+        }
+
+        return Optional.of(

Review Comment:
   nit: there's a bit of duplication here and line 86. Can we make a little helper?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private Node coordinator;
+    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+    private final ExponentialBackoff exponentialBackoff;
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while
+    private final CoordinatorRequestState coordinatorRequestState;
+
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    public CoordinatorManager(final Time time,
+                              final LogContext logContext,
+                              final ConsumerConfig config,
+                              final BlockingQueue<BackgroundEvent> backgroundEventQueue,
+                              final Optional<String> groupId,
+                              final long rebalanceTimeoutMs) {
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.backgroundEventQueue = backgroundEventQueue;
+        this.exponentialBackoff = new ExponentialBackoff(
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                RECONNECT_BACKOFF_EXP_BASE,
+                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                RECONNECT_BACKOFF_JITTER);
+        this.coordinatorRequestState = new CoordinatorRequestState();
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    /**
+     * Returns a non-empty UnsentRequest we need send a FindCoordinatorRequest. These conditions are:
+     * 1. The request has not been sent
+     * 2. If the previous request failed, and the retryBackoff has expired
+     * @return Optional UnsentRequest.  Empty if we are not allowed to send a request.
+     */
+    public Optional<NetworkClientDelegate.UnsentRequest> tryFindCoordinator() {
+        if (coordinatorRequestState.lastSentMs == -1) {
+            // no request has been sent
+            return Optional.of(
+                    new NetworkClientDelegate.UnsentRequest(
+                            this.time.timer(requestTimeoutMs),
+                            getFindCoordinatorRequest(),
+                            new FindCoordinatorRequestHandler()));
+        }
+
+        if (coordinatorRequestState.lastReceivedMs == -1 ||
+                coordinatorRequestState.lastReceivedMs < coordinatorRequestState.lastSentMs) {
+            // there is an inflight request
+            return Optional.empty();
+        }
+
+        if (!coordinatorRequestState.requestBackoffExpired()) {
+            // retryBackoff
+            return Optional.empty();
+        }
+
+        return Optional.of(
+                new NetworkClientDelegate.UnsentRequest(
+                        this.time.timer(requestTimeoutMs),
+                        getFindCoordinatorRequest(),
+                        new FindCoordinatorRequestHandler()));
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected Optional<Node> markCoordinatorUnknown(String cause) {
+        Node oldCoordinator = this.coordinator;
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to cause: {}. "
+                            + "Rediscovery will be attempted.", this.coordinator, cause);
+            this.coordinator = null;
+            lastTimeOfConnectionMs = time.milliseconds();
+        } else {
+            long durationOfOngoingDisconnect = time.milliseconds() - lastTimeOfConnectionMs;

Review Comment:
   We need to be a little careful with measurements like this is `milliseconds()` is not guaranteed to be monotonic. No harm here I guess if the value is less than 0, but perhaps we should use `Math.max(0, XX)` at least to make the log message less confusing.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private Node coordinator;
+    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+    private final ExponentialBackoff exponentialBackoff;
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while
+    private final CoordinatorRequestState coordinatorRequestState;
+
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    public CoordinatorManager(final Time time,
+                              final LogContext logContext,
+                              final ConsumerConfig config,
+                              final BlockingQueue<BackgroundEvent> backgroundEventQueue,
+                              final Optional<String> groupId,
+                              final long rebalanceTimeoutMs) {
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.backgroundEventQueue = backgroundEventQueue;
+        this.exponentialBackoff = new ExponentialBackoff(
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                RECONNECT_BACKOFF_EXP_BASE,
+                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                RECONNECT_BACKOFF_JITTER);
+        this.coordinatorRequestState = new CoordinatorRequestState();
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    /**
+     * Returns a non-empty UnsentRequest we need send a FindCoordinatorRequest. These conditions are:
+     * 1. The request has not been sent
+     * 2. If the previous request failed, and the retryBackoff has expired
+     * @return Optional UnsentRequest.  Empty if we are not allowed to send a request.
+     */
+    public Optional<NetworkClientDelegate.UnsentRequest> tryFindCoordinator() {
+        if (coordinatorRequestState.lastSentMs == -1) {
+            // no request has been sent
+            return Optional.of(
+                    new NetworkClientDelegate.UnsentRequest(
+                            this.time.timer(requestTimeoutMs),
+                            getFindCoordinatorRequest(),
+                            new FindCoordinatorRequestHandler()));
+        }
+
+        if (coordinatorRequestState.lastReceivedMs == -1 ||
+                coordinatorRequestState.lastReceivedMs < coordinatorRequestState.lastSentMs) {
+            // there is an inflight request
+            return Optional.empty();
+        }
+
+        if (!coordinatorRequestState.requestBackoffExpired()) {
+            // retryBackoff
+            return Optional.empty();
+        }
+
+        return Optional.of(
+                new NetworkClientDelegate.UnsentRequest(
+                        this.time.timer(requestTimeoutMs),
+                        getFindCoordinatorRequest(),
+                        new FindCoordinatorRequestHandler()));
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected Optional<Node> markCoordinatorUnknown(String cause) {
+        Node oldCoordinator = this.coordinator;
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to cause: {}. "
+                            + "Rediscovery will be attempted.", this.coordinator, cause);
+            this.coordinator = null;
+            lastTimeOfConnectionMs = time.milliseconds();
+        } else {
+            long durationOfOngoingDisconnect = time.milliseconds() - lastTimeOfConnectionMs;
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.warn("Consumer has been disconnected from the group coordinator for {}ms", durationOfOngoingDisconnect);
+        }
+        return Optional.ofNullable(oldCoordinator);
+    }
+
+    private AbstractRequest.Builder<?> getFindCoordinatorRequest() {
+        coordinatorRequestState.updateLastSend();
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId.orElse(null));
+        return new FindCoordinatorRequest.Builder(data);
+    }
+
+    void handleSuccessFindCoordinatorResponse(FindCoordinatorResponse response) {
+        List<FindCoordinatorResponseData.Coordinator> coordinators = response.coordinators();
+        if (coordinators.size() != 1) {
+            log.error(
+                    "Group coordinator lookup failed: Invalid response containing more than a single coordinator");
+            enqueueErrorEvent(new IllegalStateException(
+                    "Group coordinator lookup failed: Invalid response containing more than a single coordinator"));
+        }
+        FindCoordinatorResponseData.Coordinator coordinatorData = coordinators.get(0);
+        Errors error = Errors.forCode(coordinatorData.errorCode());
+        if (error == Errors.NONE) {
+            // use MAX_VALUE - node.id as the coordinator id to allow separate connections
+            // for the coordinator in the underlying network client layer
+            int coordinatorConnectionId = Integer.MAX_VALUE - coordinatorData.nodeId();
+
+            this.coordinator = new Node(
+                    coordinatorConnectionId,
+                    coordinatorData.host(),
+                    coordinatorData.port());
+            log.info("Discovered group coordinator {}", coordinator);
+            coordinatorRequestState.reset();
+            return;
+        }
+
+        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+            enqueueErrorEvent(GroupAuthorizationException.forGroupId(this.groupId.orElse(null)));
+            return;
+        }
+
+        log.debug("Group coordinator lookup failed: {}", coordinatorData.errorMessage());
+        enqueueErrorEvent(error.exception());
+    }
+
+    void handleFailedCoordinatorResponse(FindCoordinatorResponse response) {
+        log.debug("FindCoordinator request failed due to {}", response.error().toString());
+
+        if (!(response.error().exception() instanceof RetriableException)) {
+            log.info("FindCoordinator request hit fatal exception", response.error().exception());
+            // Remember the exception if fatal so we can ensure
+            // it gets thrown by the main thread
+            enqueueErrorEvent(response.error().exception());
+        }
+
+        log.debug("Coordinator discovery failed, refreshing metadata", response.error().exception());
+    }
+
+    /**
+     * Handle 3 cases of FindCoordinatorResponse: success, failure, timedout
+     *
+     * @param response FindCoordinator response
+     */
+    public void onResponse(FindCoordinatorResponse response) {
+        coordinatorRequestState.updateLastReceived();
+        if (response.hasError()) {
+            handleFailedCoordinatorResponse(response);
+            return;
+        }
+        handleSuccessFindCoordinatorResponse(response);
+    }
+
+    private void enqueueErrorEvent(Exception e) {
+        backgroundEventQueue.add(new ErrorBackgroundEvent(e));
+    }
+
+    public Node coordinator() {
+        return coordinator;
+    }
+
+    private class CoordinatorRequestState {
+        private long lastSentMs;
+        private long lastReceivedMs;
+        private int numAttempts;
+        public CoordinatorRequestState() {

Review Comment:
   nit: we can leave out the constructor and initialize the fields directly?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -109,34 +129,31 @@ public void run() {
                 try {
                     runOnce();
                 } catch (final WakeupException e) {
-                    log.debug(
-                        "Exception thrown, background thread won't terminate",
-                        e
-                    );
-                    // swallow the wakeup exception to prevent killing the
-                    // background thread.
+                    log.debug("WakeupException caught, background thread won't be interrupted");
+                    // swallow the wakeup exception to prevent killing the background thread.
                 }
             }
         } catch (final Throwable t) {
-            log.error(
-                "The background thread failed due to unexpected error",
-                t
-            );
-            if (t instanceof RuntimeException)
-                this.exception.set(Optional.of((RuntimeException) t));
-            else
-                this.exception.set(Optional.of(new RuntimeException(t)));
+            log.error("The background thread failed due to unexpected error", t);
+            throw new RuntimeException(t);
         } finally {
             close();
             log.debug("{} closed", getClass());
         }
     }
 
     /**
-     * Process event from a single poll
+     * Process event from a single poll. It performs the following tasks sequentially:
+     *  1. Try to poll and event from the queue, and try to process it.
+     *  2. Try to find Coordinator if needed
+     *  3. Try to send fetches.
+     *  4. Poll the networkClient for outstanding requests. If non: poll until next
+     *  iteration.
      */
     void runOnce() {
+        // TODO: we might not need the inflightEvent here

Review Comment:
   Can we just get rid of it if we're not using it yet?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private Node coordinator;
+    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;

Review Comment:
   Instead of passing a reference to the queue, perhaps can have an `ExceptionHandler` type. This avoids the tight coupling with the background thread.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private Node coordinator;
+    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+    private final ExponentialBackoff exponentialBackoff;
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while
+    private final CoordinatorRequestState coordinatorRequestState;
+
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    public CoordinatorManager(final Time time,
+                              final LogContext logContext,
+                              final ConsumerConfig config,
+                              final BlockingQueue<BackgroundEvent> backgroundEventQueue,
+                              final Optional<String> groupId,
+                              final long rebalanceTimeoutMs) {
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.backgroundEventQueue = backgroundEventQueue;
+        this.exponentialBackoff = new ExponentialBackoff(
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                RECONNECT_BACKOFF_EXP_BASE,
+                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                RECONNECT_BACKOFF_JITTER);
+        this.coordinatorRequestState = new CoordinatorRequestState();
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    /**
+     * Returns a non-empty UnsentRequest we need send a FindCoordinatorRequest. These conditions are:
+     * 1. The request has not been sent
+     * 2. If the previous request failed, and the retryBackoff has expired
+     * @return Optional UnsentRequest.  Empty if we are not allowed to send a request.
+     */
+    public Optional<NetworkClientDelegate.UnsentRequest> tryFindCoordinator() {
+        if (coordinatorRequestState.lastSentMs == -1) {
+            // no request has been sent
+            return Optional.of(
+                    new NetworkClientDelegate.UnsentRequest(
+                            this.time.timer(requestTimeoutMs),
+                            getFindCoordinatorRequest(),
+                            new FindCoordinatorRequestHandler()));
+        }
+
+        if (coordinatorRequestState.lastReceivedMs == -1 ||
+                coordinatorRequestState.lastReceivedMs < coordinatorRequestState.lastSentMs) {
+            // there is an inflight request
+            return Optional.empty();
+        }
+
+        if (!coordinatorRequestState.requestBackoffExpired()) {
+            // retryBackoff
+            return Optional.empty();
+        }
+
+        return Optional.of(
+                new NetworkClientDelegate.UnsentRequest(
+                        this.time.timer(requestTimeoutMs),
+                        getFindCoordinatorRequest(),
+                        new FindCoordinatorRequestHandler()));
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected Optional<Node> markCoordinatorUnknown(String cause) {

Review Comment:
   What is the use case where we want the old coordinator?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private Node coordinator;
+    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+    private final ExponentialBackoff exponentialBackoff;
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while
+    private final CoordinatorRequestState coordinatorRequestState;
+
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    public CoordinatorManager(final Time time,
+                              final LogContext logContext,
+                              final ConsumerConfig config,
+                              final BlockingQueue<BackgroundEvent> backgroundEventQueue,
+                              final Optional<String> groupId,
+                              final long rebalanceTimeoutMs) {
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.backgroundEventQueue = backgroundEventQueue;
+        this.exponentialBackoff = new ExponentialBackoff(
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                RECONNECT_BACKOFF_EXP_BASE,
+                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                RECONNECT_BACKOFF_JITTER);
+        this.coordinatorRequestState = new CoordinatorRequestState();
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    /**
+     * Returns a non-empty UnsentRequest we need send a FindCoordinatorRequest. These conditions are:
+     * 1. The request has not been sent
+     * 2. If the previous request failed, and the retryBackoff has expired
+     * @return Optional UnsentRequest.  Empty if we are not allowed to send a request.
+     */
+    public Optional<NetworkClientDelegate.UnsentRequest> tryFindCoordinator() {
+        if (coordinatorRequestState.lastSentMs == -1) {
+            // no request has been sent
+            return Optional.of(
+                    new NetworkClientDelegate.UnsentRequest(
+                            this.time.timer(requestTimeoutMs),
+                            getFindCoordinatorRequest(),
+                            new FindCoordinatorRequestHandler()));
+        }
+
+        if (coordinatorRequestState.lastReceivedMs == -1 ||
+                coordinatorRequestState.lastReceivedMs < coordinatorRequestState.lastSentMs) {
+            // there is an inflight request
+            return Optional.empty();
+        }
+
+        if (!coordinatorRequestState.requestBackoffExpired()) {
+            // retryBackoff
+            return Optional.empty();
+        }
+
+        return Optional.of(
+                new NetworkClientDelegate.UnsentRequest(
+                        this.time.timer(requestTimeoutMs),
+                        getFindCoordinatorRequest(),
+                        new FindCoordinatorRequestHandler()));
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected Optional<Node> markCoordinatorUnknown(String cause) {
+        Node oldCoordinator = this.coordinator;
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to cause: {}. "
+                            + "Rediscovery will be attempted.", this.coordinator, cause);
+            this.coordinator = null;
+            lastTimeOfConnectionMs = time.milliseconds();
+        } else {
+            long durationOfOngoingDisconnect = time.milliseconds() - lastTimeOfConnectionMs;
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.warn("Consumer has been disconnected from the group coordinator for {}ms", durationOfOngoingDisconnect);
+        }
+        return Optional.ofNullable(oldCoordinator);
+    }
+
+    private AbstractRequest.Builder<?> getFindCoordinatorRequest() {

Review Comment:
   nit: how about `newFindCoordinatorRequest`? Also, we may as well use the more specific type `FindCoordinatorRequest.Builder`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle poll and send operations.
+ */
+public class NetworkClientDelegate implements AutoCloseable {
+    private final KafkaClient client;
+    private final Time time;
+    private final Logger log;
+    private boolean wakeup = false;
+    private final Queue<UnsentRequest> unsentRequests;
+
+    public NetworkClientDelegate(
+            final Time time,
+            final LogContext logContext,
+            final KafkaClient client) {
+        this.time = time;
+        this.client = client;
+        this.log = logContext.logger(getClass());
+        this.unsentRequests = new ArrayDeque<>();
+    }
+
+    public List<ClientResponse> poll(Timer timer, boolean disableWakeup) {
+        if (!disableWakeup) {
+            // trigger wakeups after checking for disconnects so that the callbacks will be ready
+            // to be fired on the next call to poll()
+            maybeTriggerWakeup();
+        }
+
+        trySend();
+        return this.client.poll(timer.timeoutMs(), time.milliseconds());
+    }
+
+    private void trySend() {
+        while (unsentRequests.size() > 0) {
+            UnsentRequest unsent = unsentRequests.poll();
+            if (unsent.timer.isExpired()) {
+                // TODO: expired request should be marked
+                unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException(
+                        "Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms.")));
+                continue;
+            }
+
+            doSend(unsent);
+        }
+    }
+
+    static boolean isReady(KafkaClient client, Node node, long currentTime) {
+        client.poll(0, currentTime);
+        return client.isReady(node, currentTime);
+    }
+
+    public void doSend(UnsentRequest r) {
+        long now = time.milliseconds();
+        Node node = r.node.orElse(client.leastLoadedNode(now));
+        ClientRequest request = makeClientRequest(r, node);
+        // TODO: Sounds like we need to check disconnections for each node and complete the request with
+        //  authentication error
+        if (isReady(client, node, now)) {
+            client.send(request, now);
+        }
+    }
+
+    private ClientRequest makeClientRequest(UnsentRequest unsent, Node node) {
+        return client.newClientRequest(
+                node.idString(),
+                unsent.abstractBuilder,
+                time.milliseconds(),
+                true,
+                (int) unsent.timer.remainingMs(),
+                unsent.callback.orElse(new RequestFutureCompletionHandlerBase()));
+    }
+
+    public List<ClientResponse> poll() {
+        return this.poll(time.timer(0), false);
+    }
+
+    public void maybeTriggerWakeup() {
+        if (this.wakeup) {
+            this.wakeup = false;
+            throw new WakeupException();
+        }
+    }
+
+    public void wakeup() {
+        this.wakeup = true;
+        this.client.wakeup();
+    }
+
+    public Node leastLoadedNode() {
+        return this.client.leastLoadedNode(time.milliseconds());
+    }
+
+    public void add(UnsentRequest r) {
+        unsentRequests.add(r);
+    }
+
+    public void ready(Node node) {
+        client.ready(node, time.milliseconds());
+    }
+
+    /**
+     * Check if the code is disconnected and unavailable for immediate reconnection (i.e. if it is in
+     * reconnect backoff window following the disconnect).
+     */
+    public boolean nodeUnavailable(Node node) {
+        return client.connectionFailed(node) && client.connectionDelay(node, time.milliseconds()) > 0;
+    }
+
+    public void tryDisconnect(Optional<Node> coordinator) {
+        coordinator.ifPresent(node -> client.disconnect(node.idString()));
+    }
+
+    public void close() throws IOException {
+        this.client.close();
+    }
+
+    public static class UnsentRequest {
+        private final Optional<RequestFutureCompletionHandlerBase> callback;
+        private final AbstractRequest.Builder abstractBuilder;
+        private final Optional<Node> node; // empty if random node can be choosen
+        private final Timer timer;
+
+        public UnsentRequest(final Timer timer,
+                             final AbstractRequest.Builder abstractBuilder,
+                             final RequestFutureCompletionHandlerBase callback) {
+            this(timer, abstractBuilder, callback, null);
+        }
+
+        public UnsentRequest(final Timer timer,
+                             final AbstractRequest.Builder abstractBuilder,
+                             final RequestFutureCompletionHandlerBase callback,
+                             final Node node) {
+            Objects.requireNonNull(abstractBuilder);
+            this.abstractBuilder = abstractBuilder;
+            this.node = Optional.ofNullable(node);
+            this.callback = Optional.ofNullable(callback);
+            this.timer = timer;
+        }
+    }
+
+    public static class RequestFutureCompletionHandlerBase implements RequestCompletionHandler {
+        private final RequestFuture<ClientResponse> future;
+        private ClientResponse response;
+        private RuntimeException e;
+
+        RequestFutureCompletionHandlerBase() {
+            this.future = new RequestFuture<>();
+        }
+
+        public void fireCompletion() {
+            if (e != null) {
+                future.raise(e);
+            } else if (response.authenticationException() != null) {
+                future.raise(response.authenticationException());
+            } else if (response.wasDisconnected()) {
+                //log.debug("Cancelled request with header {} due to node {} being disconnected", response
+                // .requestHeader(), response.destination());
+                future.raise(DisconnectException.INSTANCE);
+            } else if (response.versionMismatch() != null) {
+                future.raise(response.versionMismatch());
+            } else {
+                future.complete(response);
+            }
+        }
+
+        public void onFailure(RuntimeException e) {
+            this.e = e;
+            fireCompletion();
+            handleResponse(response, e);
+        }
+
+        public void handleResponse(ClientResponse r, Throwable t) {}

Review Comment:
   This could be abstract if we make the class itself abstract?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -145,19 +162,52 @@ void runOnce() {
             this.inflightEvent = Optional.empty();
         }
 
+

Review Comment:
   nit: a bunch of unneeded newlines in here



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private Node coordinator;
+    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+    private final ExponentialBackoff exponentialBackoff;
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while
+    private final CoordinatorRequestState coordinatorRequestState;
+
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    public CoordinatorManager(final Time time,
+                              final LogContext logContext,
+                              final ConsumerConfig config,
+                              final BlockingQueue<BackgroundEvent> backgroundEventQueue,
+                              final Optional<String> groupId,
+                              final long rebalanceTimeoutMs) {
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.backgroundEventQueue = backgroundEventQueue;
+        this.exponentialBackoff = new ExponentialBackoff(
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                RECONNECT_BACKOFF_EXP_BASE,
+                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                RECONNECT_BACKOFF_JITTER);
+        this.coordinatorRequestState = new CoordinatorRequestState();
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    /**
+     * Returns a non-empty UnsentRequest we need send a FindCoordinatorRequest. These conditions are:
+     * 1. The request has not been sent
+     * 2. If the previous request failed, and the retryBackoff has expired
+     * @return Optional UnsentRequest.  Empty if we are not allowed to send a request.
+     */
+    public Optional<NetworkClientDelegate.UnsentRequest> tryFindCoordinator() {
+        if (coordinatorRequestState.lastSentMs == -1) {
+            // no request has been sent
+            return Optional.of(
+                    new NetworkClientDelegate.UnsentRequest(
+                            this.time.timer(requestTimeoutMs),
+                            getFindCoordinatorRequest(),
+                            new FindCoordinatorRequestHandler()));
+        }
+
+        if (coordinatorRequestState.lastReceivedMs == -1 ||
+                coordinatorRequestState.lastReceivedMs < coordinatorRequestState.lastSentMs) {
+            // there is an inflight request
+            return Optional.empty();
+        }
+
+        if (!coordinatorRequestState.requestBackoffExpired()) {
+            // retryBackoff
+            return Optional.empty();
+        }
+
+        return Optional.of(
+                new NetworkClientDelegate.UnsentRequest(
+                        this.time.timer(requestTimeoutMs),
+                        getFindCoordinatorRequest(),
+                        new FindCoordinatorRequestHandler()));
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected Optional<Node> markCoordinatorUnknown(String cause) {
+        Node oldCoordinator = this.coordinator;
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to cause: {}. "
+                            + "Rediscovery will be attempted.", this.coordinator, cause);
+            this.coordinator = null;
+            lastTimeOfConnectionMs = time.milliseconds();
+        } else {
+            long durationOfOngoingDisconnect = time.milliseconds() - lastTimeOfConnectionMs;
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.warn("Consumer has been disconnected from the group coordinator for {}ms", durationOfOngoingDisconnect);

Review Comment:
   Not sure if warn is suitable for this. Maybe debug?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -145,19 +162,52 @@ void runOnce() {
             this.inflightEvent = Optional.empty();
         }
 
+
+
+        if (shouldFindCoordinator() && coordinatorUnknown()) {
+            coordinatorManager.tryFindCoordinator().ifPresent(networkClientDelegate::add);
+        }
+
         // if there are pending events to process, poll then continue without
         // blocking.
         if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) {
-            networkClient.poll(time.timer(0));
+            networkClientDelegate.poll();
             return;
         }
         // if there are no events to process, poll until timeout. The timeout
         // will be the minimum of the requestTimeoutMs, nextHeartBeatMs, and
         // nextMetadataUpdate. See NetworkClient.poll impl.
-        networkClient.poll(time.timer(timeToNextHeartbeatMs(time.milliseconds())));
+        networkClientDelegate.poll(time.timer(timeToNextHeartbeatMs()), false);
+    }
+
+    /**
+     * Get the coordinator if its connection is still active. Otherwise, mark it unknown and
+     * return an empty optional node.
+     *
+     * @return coordinator node. Empty if it is unknown.
+     */
+    protected Optional<Node> checkAndGetCoordinator(Node coordinator) {
+        // If the current coordinator is unavailable, mark it unknown and disconnect it
+        if (coordinator != null && networkClientDelegate.nodeUnavailable(coordinator)) {
+            log.info("Requesting disconnect from last known coordinator {}", coordinator);
+            networkClientDelegate.tryDisconnect(
+                    this.coordinatorManager.markCoordinatorUnknown("coordinator unavailable"));
+            return Optional.empty();
+        }
+        return Optional.ofNullable(coordinator);
+    }
+
+    public boolean coordinatorUnknown() {
+        return !checkAndGetCoordinator(coordinatorManager.coordinator()).isPresent();
+    }
+
+    private boolean shouldFindCoordinator() {
+        // TODO: add conditions for coordinator discovery. Example: when there are pending commits, or we have

Review Comment:
   If we're not using it here, let's leave it for a future PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org