You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "yws (Jira)" <ji...@apache.org> on 2022/11/27 15:00:00 UTC

[jira] [Created] (KAFKA-14421) OffsetFetchRequest throws NPE Exception

yws created KAFKA-14421:
---------------------------

             Summary: OffsetFetchRequest throws NPE Exception
                 Key: KAFKA-14421
                 URL: https://issues.apache.org/jira/browse/KAFKA-14421
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 0.10.2.2, 0.10.2.1, 0.10.2.0
            Reporter: yws
         Attachments: image-2022-11-27-22-28-52-165.png, image-2022-11-27-22-41-45-358.png

when I use 0.10.2 client  send Metadata request to  0.10.0 server,  NPE exception happens,
 !image-2022-11-27-22-28-52-165.png! 

the NPE exception quite confused me,  because if  just send Metadata request doest not cause the NPE exception occurs, after troubleshooting the problem, It is the NetworkClient#poll call  ConsumerNetworkClient#trySend  and further call NetworkClient#doSend    when trying to build OffsetFetchRequest, because the 0.10.0 server doest not support  fetch all TopicPartitions, it throw UnsupportedVersionException, 

{code:java}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
        String nodeId = clientRequest.destination();
        ......
        AbstractRequest request = null;
        AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
        try {
            NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
            // Note: if versionInfo is null, we have no server version information. This would be
            // the case when sending the initial ApiVersionRequest which fetches the version
            // information itself.  It is also the case when discoverBrokerVersions is set to false.
            if (versionInfo == null) {
                if (discoverBrokerVersions && log.isTraceEnabled())
                    log.trace("No version information found when sending message of type {} to node {}. " +
                            "Assuming version {}.", clientRequest.apiKey(), nodeId, builder.version());
            } else {
                short version = versionInfo.usableVersion(clientRequest.apiKey());
                builder.setVersion(version);
            }
            // The call to build may also throw UnsupportedVersionException, if there are essential
            // fields that cannot be represented in the chosen version.
            request = builder.build();
        } catch (UnsupportedVersionException e) {
            // If the version is not supported, skip sending the request over the wire.
            // Instead, simply add it to the local queue of aborted requests.
            log.debug("Version mismatch when attempting to send {} to {}",
                    clientRequest.toString(), clientRequest.destination(), e);
            ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(),
                    clientRequest.callback(), clientRequest.destination(), now, now,
                    false, e, null);
            abortedSends.add(clientResponse);
            return;
        }
{code}

 !image-2022-11-27-22-41-45-358.png! 


until now, all are expected, but unfortunately, in catch UnsupportedVersionException code block, clientRequest.toString need to call requestBuilder#toString, that is OffsetFetchRequest's Builder#toString, when partition is ALL_TOPIC_PARTITIONS, it is null, therefore it cause the unexpected NPE, and make the normal MetadataRequest failed..  


{code:java}
catch (UnsupportedVersionException e) {
         
            log.debug("Version mismatch when attempting to send {} to {}",
                    clientRequest.toString(), clientRequest.destination(), e);
            ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(),
                    clientRequest.callback(), clientRequest.destination(), now, now,
                    false, e, null);
            abortedSends.add(clientResponse);
            return;
        }

ClientRequest#toString()
   public String toString() {
        return "ClientRequest(expectResponse=" + expectResponse +
            ", callback=" + callback +
            ", destination=" + destination +
            ", correlationId=" + correlationId +
            ", clientId=" + clientId +
            ", createdTimeMs=" + createdTimeMs +
            ", requestBuilder=" + requestBuilder +
            ")";
    }

  OffsetFetchRequest's Builder#toString
        public String toString() {
            StringBuilder bld = new StringBuilder();
            bld.append("(type=OffsetFetchRequest, ").
                    append("groupId=").append(groupId).
                    append(", partitions=").append(Utils.join(partitions, ",")). // cause NPE
                    append(")");
            return bld.toString();
        }

{code}

I think the NPE is unexpected, when broker doest not support specific protocal, It should not throw NPE instead of  UnsupportedVersionException, and I find in 0.11 or later version
it is fixed,  but the OffsetFetchRequest support ALL_TOPIC_PARTITIONS is introduced in 0.10.2  [kIP88|https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update],  Therefore, I think it should also be fixed in 0.10.2 client

look forward to any reply , Thanks~
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)