You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/01 00:52:01 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #12862: Consumer refator find coordinator

hachikuji commented on code in PR #12862:
URL: https://github.com/apache/kafka/pull/12862#discussion_r1036577092


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final ExponentialBackoff exponentialBackoff;
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    private CoordinatorRequestState coordinatorRequestState = new CoordinatorRequestState();
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorManager(final Time time,
+                              final LogContext logContext,
+                              final ConsumerConfig config,
+                              final ErrorEventHandler errorHandler,
+                              final Optional<String> groupId,
+                              final long rebalanceTimeoutMs) {
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.exponentialBackoff = new ExponentialBackoff(
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                RECONNECT_BACKOFF_EXP_BASE,
+                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),

Review Comment:
   nit: we may as well use `ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG` to be consistent



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final ExponentialBackoff exponentialBackoff;
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    private CoordinatorRequestState coordinatorRequestState = new CoordinatorRequestState();
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorManager(final Time time,
+                              final LogContext logContext,
+                              final ConsumerConfig config,
+                              final ErrorEventHandler errorHandler,
+                              final Optional<String> groupId,
+                              final long rebalanceTimeoutMs) {
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.exponentialBackoff = new ExponentialBackoff(
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                RECONNECT_BACKOFF_EXP_BASE,
+                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                RECONNECT_BACKOFF_JITTER);
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    /**
+     * Returns a non-empty UnsentRequest we need send a FindCoordinatorRequest. These conditions are:
+     * 1. The request has not been sent
+     * 2. If the previous request failed, and the retryBackoff has expired
+     * @return Optional UnsentRequest.  Empty if we are not allowed to send a request.
+     */
+    public Optional<NetworkClientDelegate.UnsentRequest> tryFindCoordinator() {
+        if (coordinatorRequestState.lastSentMs == -1) {
+            // no request has been sent
+            return makeFindCoordinatorRequest();
+        }
+
+        if (coordinatorRequestState.lastReceivedMs == -1 ||
+                coordinatorRequestState.lastReceivedMs < coordinatorRequestState.lastSentMs) {
+            // there is an inflight request
+            return Optional.empty();
+        }
+
+        if (!coordinatorRequestState.requestBackoffExpired()) {
+            // retryBackoff
+            return Optional.empty();
+        }
+
+        return makeFindCoordinatorRequest();
+    }
+
+    private Optional<NetworkClientDelegate.UnsentRequest> makeFindCoordinatorRequest() {
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                newFindCoordinatorRequest(),

Review Comment:
   nit: since there's just one caller of `newFindCoordinatorRequest`, maybe we can inline it here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+
+    private final Logger log;
+    private final Time time;

Review Comment:
   We have to be a little careful with calls to `Time.milliseconds()` since they can be expensive when overused. Usually we would structure the poll loop to compute the current time at the start of the event loop and then pass it to each component (e.g. instead of `tryFindCoordinator()`, we would use `tryFindCoordinator(currentTimeMs`).



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private Node coordinator;
+    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+    private final ExponentialBackoff exponentialBackoff;
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while
+    private final CoordinatorRequestState coordinatorRequestState;
+
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    public CoordinatorManager(final Time time,
+                              final LogContext logContext,
+                              final ConsumerConfig config,
+                              final BlockingQueue<BackgroundEvent> backgroundEventQueue,
+                              final Optional<String> groupId,
+                              final long rebalanceTimeoutMs) {
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.backgroundEventQueue = backgroundEventQueue;
+        this.exponentialBackoff = new ExponentialBackoff(
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                RECONNECT_BACKOFF_EXP_BASE,
+                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                RECONNECT_BACKOFF_JITTER);
+        this.coordinatorRequestState = new CoordinatorRequestState();
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    /**
+     * Returns a non-empty UnsentRequest we need send a FindCoordinatorRequest. These conditions are:
+     * 1. The request has not been sent
+     * 2. If the previous request failed, and the retryBackoff has expired
+     * @return Optional UnsentRequest.  Empty if we are not allowed to send a request.
+     */
+    public Optional<NetworkClientDelegate.UnsentRequest> tryFindCoordinator() {
+        if (coordinatorRequestState.lastSentMs == -1) {
+            // no request has been sent
+            return Optional.of(
+                    new NetworkClientDelegate.UnsentRequest(
+                            this.time.timer(requestTimeoutMs),
+                            getFindCoordinatorRequest(),
+                            new FindCoordinatorRequestHandler()));
+        }
+
+        if (coordinatorRequestState.lastReceivedMs == -1 ||
+                coordinatorRequestState.lastReceivedMs < coordinatorRequestState.lastSentMs) {
+            // there is an inflight request
+            return Optional.empty();
+        }
+
+        if (!coordinatorRequestState.requestBackoffExpired()) {
+            // retryBackoff
+            return Optional.empty();
+        }
+
+        return Optional.of(
+                new NetworkClientDelegate.UnsentRequest(
+                        this.time.timer(requestTimeoutMs),
+                        getFindCoordinatorRequest(),
+                        new FindCoordinatorRequestHandler()));
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected Optional<Node> markCoordinatorUnknown(String cause) {
+        Node oldCoordinator = this.coordinator;
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to cause: {}. "
+                            + "Rediscovery will be attempted.", this.coordinator, cause);
+            this.coordinator = null;
+            lastTimeOfConnectionMs = time.milliseconds();
+        } else {
+            long durationOfOngoingDisconnect = time.milliseconds() - lastTimeOfConnectionMs;
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.warn("Consumer has been disconnected from the group coordinator for {}ms", durationOfOngoingDisconnect);
+        }
+        return Optional.ofNullable(oldCoordinator);
+    }
+
+    private AbstractRequest.Builder<?> getFindCoordinatorRequest() {
+        coordinatorRequestState.updateLastSend();
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId.orElse(null));
+        return new FindCoordinatorRequest.Builder(data);
+    }
+
+    void handleSuccessFindCoordinatorResponse(FindCoordinatorResponse response) {
+        List<FindCoordinatorResponseData.Coordinator> coordinators = response.coordinators();
+        if (coordinators.size() != 1) {
+            log.error(
+                    "Group coordinator lookup failed: Invalid response containing more than a single coordinator");
+            enqueueErrorEvent(new IllegalStateException(
+                    "Group coordinator lookup failed: Invalid response containing more than a single coordinator"));
+        }
+        FindCoordinatorResponseData.Coordinator coordinatorData = coordinators.get(0);
+        Errors error = Errors.forCode(coordinatorData.errorCode());
+        if (error == Errors.NONE) {
+            // use MAX_VALUE - node.id as the coordinator id to allow separate connections
+            // for the coordinator in the underlying network client layer
+            int coordinatorConnectionId = Integer.MAX_VALUE - coordinatorData.nodeId();
+
+            this.coordinator = new Node(
+                    coordinatorConnectionId,
+                    coordinatorData.host(),
+                    coordinatorData.port());
+            log.info("Discovered group coordinator {}", coordinator);
+            coordinatorRequestState.reset();
+            return;
+        }
+
+        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+            enqueueErrorEvent(GroupAuthorizationException.forGroupId(this.groupId.orElse(null)));
+            return;
+        }
+
+        log.debug("Group coordinator lookup failed: {}", coordinatorData.errorMessage());
+        enqueueErrorEvent(error.exception());
+    }
+
+    void handleFailedCoordinatorResponse(FindCoordinatorResponse response) {
+        log.debug("FindCoordinator request failed due to {}", response.error().toString());
+
+        if (!(response.error().exception() instanceof RetriableException)) {
+            log.info("FindCoordinator request hit fatal exception", response.error().exception());
+            // Remember the exception if fatal so we can ensure
+            // it gets thrown by the main thread
+            enqueueErrorEvent(response.error().exception());
+        }
+
+        log.debug("Coordinator discovery failed, refreshing metadata", response.error().exception());
+    }
+
+    /**
+     * Handle 3 cases of FindCoordinatorResponse: success, failure, timedout
+     *
+     * @param response FindCoordinator response
+     */
+    public void onResponse(FindCoordinatorResponse response) {
+        coordinatorRequestState.updateLastReceived();
+        if (response.hasError()) {
+            handleFailedCoordinatorResponse(response);
+            return;
+        }
+        handleSuccessFindCoordinatorResponse(response);
+    }
+
+    private void enqueueErrorEvent(Exception e) {
+        backgroundEventQueue.add(new ErrorBackgroundEvent(e));
+    }
+
+    public Node coordinator() {
+        return coordinator;
+    }
+
+    private class CoordinatorRequestState {
+        private long lastSentMs;
+        private long lastReceivedMs;
+        private int numAttempts;
+        public CoordinatorRequestState() {

Review Comment:
   I just mean we can do this:
   ```java
           private long lastSentMs = -1;
           private long lastReceivedMs = -1;
           private int numAttempts = 0;
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final ExponentialBackoff exponentialBackoff;
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    private CoordinatorRequestState coordinatorRequestState = new CoordinatorRequestState();

Review Comment:
   Can this be `final` now that we have added the `reset` method?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;

Review Comment:
   It looks like 0.2 is a typical jitter that is used.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -145,19 +166,46 @@ void runOnce() {
             this.inflightEvent = Optional.empty();
         }
 
+        // TODO: Add a condition here, like shouldFindCoordinator in the future.  Since we don't always need to find
+        //  the coordinator.
+        if (coordinatorUnknown()) {
+            coordinatorManager.tryFindCoordinator().ifPresent(networkClientDelegate::add);
+        }
+
         // if there are pending events to process, poll then continue without
         // blocking.
         if (!applicationEventQueue.isEmpty() || inflightEvent.isPresent()) {
-            networkClient.poll(time.timer(0));
+            networkClientDelegate.poll();

Review Comment:
   nit: why don't we just set the poll timeout to 0 in this case?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final ExponentialBackoff exponentialBackoff;

Review Comment:
   Could we move this to `CoordinatorRequestState`? It looks like that's the only place it is used.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final ExponentialBackoff exponentialBackoff;
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    private CoordinatorRequestState coordinatorRequestState = new CoordinatorRequestState();
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while

Review Comment:
   Perhaps a more accurate name given the implementation is `timeMarkedUnknownMs` or something like that?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final ExponentialBackoff exponentialBackoff;
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    private CoordinatorRequestState coordinatorRequestState = new CoordinatorRequestState();
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorManager(final Time time,
+                              final LogContext logContext,
+                              final ConsumerConfig config,
+                              final ErrorEventHandler errorHandler,
+                              final Optional<String> groupId,
+                              final long rebalanceTimeoutMs) {
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.exponentialBackoff = new ExponentialBackoff(
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                RECONNECT_BACKOFF_EXP_BASE,
+                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                RECONNECT_BACKOFF_JITTER);
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    /**
+     * Returns a non-empty UnsentRequest we need send a FindCoordinatorRequest. These conditions are:
+     * 1. The request has not been sent
+     * 2. If the previous request failed, and the retryBackoff has expired
+     * @return Optional UnsentRequest.  Empty if we are not allowed to send a request.
+     */
+    public Optional<NetworkClientDelegate.UnsentRequest> tryFindCoordinator() {
+        if (coordinatorRequestState.lastSentMs == -1) {
+            // no request has been sent
+            return makeFindCoordinatorRequest();
+        }
+
+        if (coordinatorRequestState.lastReceivedMs == -1 ||
+                coordinatorRequestState.lastReceivedMs < coordinatorRequestState.lastSentMs) {
+            // there is an inflight request
+            return Optional.empty();
+        }
+
+        if (!coordinatorRequestState.requestBackoffExpired()) {
+            // retryBackoff
+            return Optional.empty();
+        }
+
+        return makeFindCoordinatorRequest();
+    }
+
+    private Optional<NetworkClientDelegate.UnsentRequest> makeFindCoordinatorRequest() {
+        return NetworkClientDelegate.UnsentRequest.makeUnsentRequest(
+                this.time.timer(requestTimeoutMs),
+                newFindCoordinatorRequest(),
+                new FindCoordinatorRequestHandler());
+    }
+
+    /**
+     * Mark the current coordinator null and return the old coordinator. Return an empty Optional
+     * if the current coordinator is unknown.
+     * @param cause why the coordinator is marked unknown
+     * @return Optional coordinator node that can be null.
+     */
+    protected Optional<Node> markCoordinatorUnknown(String cause) {
+        Node oldCoordinator = this.coordinator;
+        if (this.coordinator != null) {
+            log.info("Group coordinator {} is unavailable or invalid due to cause: {}. "
+                            + "Rediscovery will be attempted.", this.coordinator, cause);
+            this.coordinator = null;
+            lastTimeOfConnectionMs = time.milliseconds();
+        } else {
+            long durationOfOngoingDisconnect = Math.max(0, time.milliseconds() - lastTimeOfConnectionMs);
+            if (durationOfOngoingDisconnect > this.rebalanceTimeoutMs)
+                log.debug("Consumer has been disconnected from the group coordinator for {}ms",
+                        durationOfOngoingDisconnect);
+        }
+        return Optional.ofNullable(oldCoordinator);
+    }
+
+    private FindCoordinatorRequest.Builder newFindCoordinatorRequest() {
+        coordinatorRequestState.updateLastSend();
+        FindCoordinatorRequestData data = new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
+                .setKey(this.groupId.orElse(null));
+        return new FindCoordinatorRequest.Builder(data);
+    }
+
+    void handleSuccessFindCoordinatorResponse(FindCoordinatorResponse response) {
+        List<FindCoordinatorResponseData.Coordinator> coordinators = response.coordinators();
+        if (coordinators.size() != 1) {
+            log.error(
+                    "Group coordinator lookup failed: Invalid response containing more than a single coordinator");
+            errorHandler.handle(new IllegalStateException(
+                    "Group coordinator lookup failed: Invalid response containing more than a single coordinator"));
+        }
+        FindCoordinatorResponseData.Coordinator coordinatorData = coordinators.get(0);
+        Errors error = Errors.forCode(coordinatorData.errorCode());
+        if (error == Errors.NONE) {
+            // use MAX_VALUE - node.id as the coordinator id to allow separate connections
+            // for the coordinator in the underlying network client layer
+            int coordinatorConnectionId = Integer.MAX_VALUE - coordinatorData.nodeId();
+
+            this.coordinator = new Node(
+                    coordinatorConnectionId,
+                    coordinatorData.host(),
+                    coordinatorData.port());
+            log.info("Discovered group coordinator {}", coordinator);
+            coordinatorRequestState.reset();
+            return;
+        }
+
+        if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+            errorHandler.handle(GroupAuthorizationException.forGroupId(this.groupId.orElse(null)));
+            return;
+        }
+
+        log.debug("Group coordinator lookup failed: {}", coordinatorData.errorMessage());
+        errorHandler.handle(error.exception());
+    }
+
+    void handleFailedCoordinatorResponse(FindCoordinatorResponse response) {
+        log.debug("FindCoordinator request failed due to {}", response.error().toString());
+
+        if (!(response.error().exception() instanceof RetriableException)) {
+            log.info("FindCoordinator request hit fatal exception", response.error().exception());
+            // Remember the exception if fatal so we can ensure
+            // it gets thrown by the main thread
+            errorHandler.handle(response.error().exception());
+        }
+
+        log.debug("Coordinator discovery failed, refreshing metadata", response.error().exception());
+    }
+
+    /**
+     * Handle 3 cases of FindCoordinatorResponse: success, failure, timedout
+     *
+     * @param response FindCoordinator response
+     */
+    public void onResponse(FindCoordinatorResponse response) {
+        coordinatorRequestState.updateLastReceived();
+        if (response.hasError()) {
+            handleFailedCoordinatorResponse(response);
+            return;
+        }
+        handleSuccessFindCoordinatorResponse(response);
+    }
+
+    public Node coordinator() {
+        return coordinator;
+    }
+
+    private class CoordinatorRequestState {
+        private long lastSentMs;
+        private long lastReceivedMs;
+        private int numAttempts;
+        public CoordinatorRequestState() {
+            this.lastSentMs = -1;
+            this.lastReceivedMs = -1;
+            this.numAttempts = 0;
+        }
+
+        public void reset() {

Review Comment:
   Shouldn't we reset `numAttempts` here as well?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -145,19 +166,46 @@ void runOnce() {
             this.inflightEvent = Optional.empty();
         }
 
+        // TODO: Add a condition here, like shouldFindCoordinator in the future.  Since we don't always need to find
+        //  the coordinator.
+        if (coordinatorUnknown()) {

Review Comment:
   nit: I'd suggest moving this check into `CoordinatorManager`. We could change the name from `tryFindCoordinator` to `maybeFindCoordinator`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorManager.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+public class CoordinatorManager {
+    final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    final static double RECONNECT_BACKOFF_JITTER = 0.0;
+
+    private final Logger log;
+    private final Time time;
+    private final long requestTimeoutMs;
+    private final ErrorEventHandler errorHandler;
+    private final ExponentialBackoff exponentialBackoff;
+    private final long rebalanceTimeoutMs;
+    private final Optional<String> groupId;
+
+    private CoordinatorRequestState coordinatorRequestState = new CoordinatorRequestState();
+    private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while
+    private Node coordinator;
+
+
+    public CoordinatorManager(final Time time,
+                              final LogContext logContext,
+                              final ConsumerConfig config,
+                              final ErrorEventHandler errorHandler,
+                              final Optional<String> groupId,
+                              final long rebalanceTimeoutMs) {
+        this.time = time;
+        this.log = logContext.logger(this.getClass());
+        this.errorHandler = errorHandler;
+        this.exponentialBackoff = new ExponentialBackoff(
+                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                RECONNECT_BACKOFF_EXP_BASE,
+                config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                RECONNECT_BACKOFF_JITTER);
+        this.groupId = groupId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    /**
+     * Returns a non-empty UnsentRequest we need send a FindCoordinatorRequest. These conditions are:
+     * 1. The request has not been sent
+     * 2. If the previous request failed, and the retryBackoff has expired
+     * @return Optional UnsentRequest.  Empty if we are not allowed to send a request.
+     */
+    public Optional<NetworkClientDelegate.UnsentRequest> tryFindCoordinator() {
+        if (coordinatorRequestState.lastSentMs == -1) {
+            // no request has been sent
+            return makeFindCoordinatorRequest();
+        }
+
+        if (coordinatorRequestState.lastReceivedMs == -1 ||

Review Comment:
   I think we can move this check to `CoordinatorRequestState` and give it a nice name. For example, `isRequestInflight`. 
   
   Or perhaps we could even combine all of these checks into a single check? Maybe `canSendRequest`? Then we could structure this as the following:
   
   ```java
   public Optional<NetworkClientDelegate.UnsentRequest> maybeFindCoordinator() {
     if (coordinator != null) {
       return Optional.empty();
     }
   
     if (coordinatorRequestState.canSendRequest()) {
       return Optional.of(makeFindCoordinatorRequest());
     } 
   
     return Optional.empty();
   }
   ```
       



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -145,19 +166,46 @@ void runOnce() {
             this.inflightEvent = Optional.empty();
         }
 
+        // TODO: Add a condition here, like shouldFindCoordinator in the future.  Since we don't always need to find
+        //  the coordinator.
+        if (coordinatorUnknown()) {
+            coordinatorManager.tryFindCoordinator().ifPresent(networkClientDelegate::add);

Review Comment:
   If `CoordinatorManager` is backing off after a failure, we want to ensure that the network thread gets woken up after the backoff has expired. Perhaps instead of returning `UnsentRequest` directly, we can create a new object:
   
   ```java
   class PollResult {
     int millisUntilNextEvent;
     List<UnsentRequest> requestsToSend;
   }
   ```
   Then perhaps we could structure each manager class in a similar way. Instead of `tryFindCoordinator`, we could have a generic `poll()`. For example:
   ```java
   class CoordinatorManager {
     PollResult poll(long currentTimeMs);
   }
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##########
@@ -109,34 +129,31 @@ public void run() {
                 try {
                     runOnce();
                 } catch (final WakeupException e) {
-                    log.debug(
-                        "Exception thrown, background thread won't terminate",
-                        e
-                    );
-                    // swallow the wakeup exception to prevent killing the
-                    // background thread.
+                    log.debug("WakeupException caught, background thread won't be interrupted");
+                    // swallow the wakeup exception to prevent killing the background thread.
                 }
             }
         } catch (final Throwable t) {
-            log.error(
-                "The background thread failed due to unexpected error",
-                t
-            );
-            if (t instanceof RuntimeException)
-                this.exception.set(Optional.of((RuntimeException) t));
-            else
-                this.exception.set(Optional.of(new RuntimeException(t)));
+            log.error("The background thread failed due to unexpected error", t);
+            throw new RuntimeException(t);
         } finally {
             close();
             log.debug("{} closed", getClass());
         }
     }
 
     /**
-     * Process event from a single poll
+     * Process event from a single poll. It performs the following tasks sequentially:
+     *  1. Try to poll and event from the queue, and try to process it.
+     *  2. Try to find Coordinator if needed
+     *  3. Try to send fetches.
+     *  4. Poll the networkClient for outstanding requests. If non: poll until next
+     *  iteration.
      */
     void runOnce() {
+        // TODO: we might not need the inflightEvent here

Review Comment:
   I'd be inclined to just drop it here if we are already adding a TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org