You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/12 02:57:01 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10281: KAFKA-12432: AdminClient should time out nodes that are never ready

hachikuji commented on a change in pull request #10281:
URL: https://github.com/apache/kafka/pull/10281#discussion_r592865485



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1136,17 +1185,12 @@ private void timeoutCallsInFlight(TimeoutProcessor processor) {
                 // only one we need to check the timeout for.
                 Call call = contexts.get(0);
                 if (processor.callHasExpired(call)) {
-                    if (call.aborted) {
-                        log.warn("Aborted call {} is still in callsInFlight.", call);
-                    } else {
-                        log.debug("Closing connection to {} due to timeout while awaiting {}", nodeId, call);
-                        call.aborted = true;
-                        client.disconnect(nodeId);
-                        numTimedOut++;
-                        // We don't remove anything from the callsInFlight data structure. Because the connection
-                        // has been closed, the calls should be returned by the next client#poll(),
-                        // and handled at that point.
-                    }
+                    log.debug("Disconnecting from {} due to timeout while awaiting {}", nodeId, call);

Review comment:
       I wonder if we can raise this to info? This is a really important event to see in the logs, and we are always left guessing about it when the user does not have debug logging enabled. The frequency at the rate of the request timeout should keep it from being too spammy.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
                     continue;
                 }
                 Node node = entry.getKey();
+                if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) {
+                    log.trace("Still waiting for other calls to finish on node {}.", node);
+                    nodeReadyDeadlines.remove(node);
+                    continue;

Review comment:
       Maybe I am missing something here. Why do we need to continue? Do we not support multiple in-flight requests? If not, then why does `callsInFlight` map to a list?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
                     continue;
                 }
                 Node node = entry.getKey();
+                if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) {
+                    log.trace("Still waiting for other calls to finish on node {}.", node);
+                    nodeReadyDeadlines.remove(node);
+                    continue;
+                }
                 if (!client.ready(node, now)) {
+                    Long deadline = nodeReadyDeadlines.get(node);
+                    if (deadline != null) {
+                        if (now >= deadline) {
+                            log.info("Disconnecting from {} and revoking {} node assignment(s) " +
+                                "because the node is taking too long to become ready.",
+                                node.idString(), calls.size());
+                            transitionToPendingAndClearList(calls);
+                            client.disconnect(node.idString());
+                            nodeReadyDeadlines.remove(node);
+                            iter.remove();
+                            continue;
+                        }
+                        pollTimeout = Math.min(pollTimeout, deadline - now);
+                    } else {
+                        nodeReadyDeadlines.put(node, now + requestTimeoutMs);

Review comment:
       It seems like it would be quite intuitive to use `socket.connection.setup.timeout.ms`. Is it because of the complexity of `socket.connection.setup.timeout.max.ms` that we don't do this? Basically we don't know how long the network client itself is planning to wait for the connection to be established.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1089,29 +1106,61 @@ private long sendEligibleCalls(long now) {
                     continue;
                 }
                 Node node = entry.getKey();
+                if (!callsInFlight.getOrDefault(node.idString(), Collections.emptyList()).isEmpty()) {
+                    log.trace("Still waiting for other calls to finish on node {}.", node);
+                    nodeReadyDeadlines.remove(node);
+                    continue;
+                }
                 if (!client.ready(node, now)) {
+                    Long deadline = nodeReadyDeadlines.get(node);
+                    if (deadline != null) {
+                        if (now >= deadline) {
+                            log.info("Disconnecting from {} and revoking {} node assignment(s) " +
+                                "because the node is taking too long to become ready.",
+                                node.idString(), calls.size());
+                            transitionToPendingAndClearList(calls);
+                            client.disconnect(node.idString());
+                            nodeReadyDeadlines.remove(node);
+                            iter.remove();
+                            continue;
+                        }
+                        pollTimeout = Math.min(pollTimeout, deadline - now);
+                    } else {
+                        nodeReadyDeadlines.put(node, now + requestTimeoutMs);
+                    }
                     long nodeTimeout = client.pollDelayMs(node, now);
                     pollTimeout = Math.min(pollTimeout, nodeTimeout);
                     log.trace("Client is not ready to send to {}. Must delay {} ms", node, nodeTimeout);
                     continue;
                 }
-                Call call = calls.remove(0);
-                int requestTimeoutMs = Math.min(KafkaAdminClient.this.requestTimeoutMs,
+                int remainingRequestTime;
+                Long deadlineMs = nodeReadyDeadlines.remove(node);

Review comment:
       So I guess the intent here is to count the "ready time" toward the request timeout. I think that's reasonable. It means that request timeout represents the maximum time we will commit to trying to get a response from a particular node before retrying. It might be worth a comment to explain this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org