You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Rajini Sivaram (Jira)" <ji...@apache.org> on 2021/06/15 08:58:00 UTC
[jira] [Resolved] (KAFKA-12948) NetworkClient.close(node) with node
in connecting state makes NetworkClient unusable
[ https://issues.apache.org/jira/browse/KAFKA-12948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rajini Sivaram resolved KAFKA-12948.
------------------------------------
Fix Version/s: 3.0.0
Reviewer: David Jacot
Resolution: Fixed
> NetworkClient.close(node) with node in connecting state makes NetworkClient unusable
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-12948
> URL: https://issues.apache.org/jira/browse/KAFKA-12948
> Project: Kafka
> Issue Type: Bug
> Components: network
> Affects Versions: 2.8.0, 2.7.1
> Reporter: Rajini Sivaram
> Assignee: Rajini Sivaram
> Priority: Critical
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> `NetworkClient.close(node)` closes the node and removes it from `ClusterConnectionStates.nodeState`, but not from `ClusterConnectionStates.connectingNodes`. Subsequent `NetworkClient.poll()` invocations throw IllegalStateException and this leaves the NetworkClient in an unusable state until the node is removed from connectionNodes or added to nodeState. We don't use `NetworkClient.close(node)` in clients, but we use it in clients started by brokers for replica fetcher and controller. Since brokers use NetworkClientUtils.isReady() before establishing connections and this invokes poll(), the NetworkClient never recovers.
> Exception stack trace:
> {code:java}
> java.lang.IllegalStateException: No entry found for connection 0
> at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:409)
> at org.apache.kafka.clients.ClusterConnectionStates.isConnectionSetupTimeout(ClusterConnectionStates.java:446)
> at org.apache.kafka.clients.ClusterConnectionStates.lambda$nodesWithConnectionSetupTimeout$0(ClusterConnectionStates.java:458)
> at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
> at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at org.apache.kafka.clients.ClusterConnectionStates.nodesWithConnectionSetupTimeout(ClusterConnectionStates.java:459)
> at org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:807)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> at org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:42)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)