You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ismael Juma (Jira)" <ji...@apache.org> on 2021/06/14 21:46:00 UTC

[jira] [Commented] (KAFKA-12948) NetworkClient.close(node) with node in connecting state makes NetworkClient unusable

    [ https://issues.apache.org/jira/browse/KAFKA-12948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363227#comment-17363227 ] 

Ismael Juma commented on KAFKA-12948:
-------------------------------------

Good catch. So, this regressed in 2.7.0?

> NetworkClient.close(node) with node in connecting state makes NetworkClient unusable
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12948
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12948
>             Project: Kafka
>          Issue Type: Bug
>          Components: network
>    Affects Versions: 2.8.0, 2.7.1
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Critical
>             Fix For: 2.7.2, 2.8.1
>
>
> `NetworkClient.close(node)` closes the node and removes it from `ClusterConnectionStates.nodeState`, but not from `ClusterConnectionStates.connectingNodes`. Subsequent `NetworkClient.poll()` invocations throw IllegalStateException and this leaves the NetworkClient in an unusable state until the node is removed from connectionNodes or added to nodeState. We don't use `NetworkClient.close(node)` in clients, but we use it in clients started by brokers for replica fetcher and controller. Since brokers use NetworkClientUtils.isReady() before establishing connections and this invokes poll(), the NetworkClient never recovers.
> Exception stack trace:
> {code:java}
>     java.lang.IllegalStateException: No entry found for connection 0
>         at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:409)
>         at org.apache.kafka.clients.ClusterConnectionStates.isConnectionSetupTimeout(ClusterConnectionStates.java:446)
>         at org.apache.kafka.clients.ClusterConnectionStates.lambda$nodesWithConnectionSetupTimeout$0(ClusterConnectionStates.java:458)
>         at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
>         at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
>         at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>         at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>         at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>         at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>         at org.apache.kafka.clients.ClusterConnectionStates.nodesWithConnectionSetupTimeout(ClusterConnectionStates.java:459)
>         at org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:807)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>         at org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:42)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)