You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jason Gustafson (Jira)" <ji...@apache.org> on 2020/12/30 21:31:00 UTC
[jira] [Created] (KAFKA-10894) Null replica nodes included in
client quota callback Cluster
Jason Gustafson created KAFKA-10894:
---------------------------------------
Summary: Null replica nodes included in client quota callback Cluster
Key: KAFKA-10894
URL: https://issues.apache.org/jira/browse/KAFKA-10894
Project: Kafka
Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson
I noticed an NPE in the client quota callback `updateClusterMetadata` due to the presence of null nodes inside a `PartitionInfo` instance. Here is the trace:
```
java.lang.NullPointerException
at org.apache.kafka.common.PartitionInfo.formatNodeIds(PartitionInfo.java:143)
at org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:132)
at java.base/java.lang.String.valueOf(String.java:3388)
at java.base/java.lang.StringBuilder.append(StringBuilder.java:167)
at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457)
at java.base/java.util.Collections$UnmodifiableCollection.toString(Collections.java:1042)
at java.base/java.lang.String.valueOf(String.java:3388)
at java.base/java.lang.StringBuilder.append(StringBuilder.java:167)
at org.apache.kafka.common.Cluster.toString(Cluster.java:348)
```
After some debugging, I found that `PartitionInfo.replicas` had a null value. The javadoc for this field is the following:
```
/**
* The complete set of replicas for this partition regardless of whether they are alive or up-to-date
*/
public Node[] replicas() {
return replicas;
}
```
It's pretty clear that the expectation is that arrays do not contain null values. On the client in `MetadataResponse`, we use the following logic to deal with nodes which are not alive:
```
private static Node[] convertToNodeArray(List<Integer> replicaIds, Map<Integer, Node> nodesById) {
return replicaIds.stream().map(replicaId -> {
Node node = nodesById.get(replicaId);
if (node == null)
return new Node(replicaId, "", -1);
return node;
}).toArray(Node[]::new);
}
```
However, inside `MetadataCache.getClusterMetadata` (which is used in the quota callback), we have the following logic:
```
val nodes = snapshot.aliveNodes.map { case (id, nodes) => (id, nodes.get(listenerName).orNull) }
def node(id: Integer): Node = nodes.get(id.toLong).orNull
val partitions = getAllPartitions(snapshot)
.filter { case (_, state) => state.leader != LeaderAndIsr.LeaderDuringDelete }
.map { case (tp, state) =>
new PartitionInfo(tp.topic, tp.partition, node(state.leader),
state.replicas.asScala.map(node).toArray,
state.isr.asScala.map(node).toArray,
state.offlineReplicas.asScala.map(node).toArray)
}
```
Note specifically that the nested `node` method returns null if the replica is not alive. It looks like we need to mimic the same logic from `MetadataResponse` here.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)