You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jason Gustafson (Jira)" <ji...@apache.org> on 2020/12/30 21:31:00 UTC

[jira] [Created] (KAFKA-10894) Null replica nodes included in client quota callback Cluster

Jason Gustafson created KAFKA-10894:
---------------------------------------

             Summary: Null replica nodes included in client quota callback Cluster
                 Key: KAFKA-10894
                 URL: https://issues.apache.org/jira/browse/KAFKA-10894
             Project: Kafka
          Issue Type: Bug
            Reporter: Jason Gustafson
            Assignee: Jason Gustafson


I noticed an NPE in the client quota callback `updateClusterMetadata` due to the presence of null nodes inside a `PartitionInfo` instance. Here is the trace:
```
java.lang.NullPointerException
	at org.apache.kafka.common.PartitionInfo.formatNodeIds(PartitionInfo.java:143)
	at org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:132)
	at java.base/java.lang.String.valueOf(String.java:3388)
	at java.base/java.lang.StringBuilder.append(StringBuilder.java:167)
	at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457)
	at java.base/java.util.Collections$UnmodifiableCollection.toString(Collections.java:1042)
	at java.base/java.lang.String.valueOf(String.java:3388)
	at java.base/java.lang.StringBuilder.append(StringBuilder.java:167)
	at org.apache.kafka.common.Cluster.toString(Cluster.java:348)
```

After some debugging, I found that `PartitionInfo.replicas` had a null value. The javadoc for this field is the following:
```
    /**
     * The complete set of replicas for this partition regardless of whether they are alive or up-to-date
     */
    public Node[] replicas() {
        return replicas;
    }
```

It's pretty clear that the expectation is that arrays do not contain null values. On the client in `MetadataResponse`, we use the following logic to deal with nodes which are not alive:
```
    private static Node[] convertToNodeArray(List<Integer> replicaIds, Map<Integer, Node> nodesById) {
        return replicaIds.stream().map(replicaId -> {
            Node node = nodesById.get(replicaId);
            if (node == null)
                return new Node(replicaId, "", -1);
            return node;
        }).toArray(Node[]::new);
    }
```

However, inside `MetadataCache.getClusterMetadata` (which is used in the quota callback), we have the following logic:
```
    val nodes = snapshot.aliveNodes.map { case (id, nodes) => (id, nodes.get(listenerName).orNull) }
    def node(id: Integer): Node = nodes.get(id.toLong).orNull
    val partitions = getAllPartitions(snapshot)
      .filter { case (_, state) => state.leader != LeaderAndIsr.LeaderDuringDelete }
      .map { case (tp, state) =>
        new PartitionInfo(tp.topic, tp.partition, node(state.leader),
          state.replicas.asScala.map(node).toArray,
          state.isr.asScala.map(node).toArray,
          state.offlineReplicas.asScala.map(node).toArray)
      }
```
Note specifically that the nested `node` method returns null if the replica is not alive. It looks like we need to mimic the same logic from `MetadataResponse` here.






--
This message was sent by Atlassian Jira
(v8.3.4#803005)