You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "lqjacklee (Jira)" <ji...@apache.org> on 2020/06/28 05:19:00 UTC

[jira] [Created] (KAFKA-10208) org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.OffsetFetchResponseHandler return null when Broker unexpectedly doesn't support requireStable flag on version while not any information

lqjacklee created KAFKA-10208:
---------------------------------

             Summary: org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.OffsetFetchResponseHandler return null when Broker unexpectedly doesn't support requireStable flag on version while not any information
                 Key: KAFKA-10208
                 URL: https://issues.apache.org/jira/browse/KAFKA-10208
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.7.0
            Reporter: lqjacklee


When the 2.7.0 client try to request the broker whose version is 2.3.0, the OffsetAndMetadata will be null and miss the Key information. 

I have create the test case as below : 

@Test
    public void testCreateTopicAndCheckTheOffsite() throws ExecutionException, InterruptedException {
        String topicName = UUID.randomUUID().toString();
        String groupId = "DEMO_" + topicName;
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, groupId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        String serializer = StringSerializer.class.getName();
        String deserializer = StringDeserializer.class.getName();
        props.put("auto.offset.reset", "latest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", deserializer);
        props.put("value.deserializer", deserializer);
        props.put("key.serializer", serializer);
        props.put("value.serializer", serializer);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        AdminClient adminClient = AdminClient.create(props);
        boolean topicExist = false;
        try {
            NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
            CreateTopicsOptions createTopicsOptions = new CreateTopicsOptions();
            createTopicsOptions.timeoutMs(3000000);
            final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic), createTopicsOptions);
            createTopicsResult.values().get(topicName).get();
        }catch (TopicExistsException e) {
            topicExist = true;
        }

        try {
            List<TopicPartition> topicPartitions = new ArrayList<>();
            KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(props);
            Field kafkaClientField = kafkaConsumer.getClass().getDeclaredField("client");
            kafkaClientField.setAccessible(true);
            ConsumerNetworkClient client = (ConsumerNetworkClient) kafkaClientField.get(kafkaConsumer);

            FindCoordinatorRequest.Builder findCoordinatorRequest =
                    new FindCoordinatorRequest.Builder(
                            new FindCoordinatorRequestData()
                                    .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
                                    .setKey(groupId));
            Node node = client.leastLoadedNode();
            Node coordinator;
            RequestFuture<Node> requestCoordinatorFuture = client.send(node, findCoordinatorRequest)
                    .compose(new RequestFutureAdapter<ClientResponse, Node>() {

                        @Override
                        public void onFailure(RuntimeException e, RequestFuture<Node> future) {
                            super.onFailure(e, future);
                        }

                        @Override
                        public void onSuccess(ClientResponse value, RequestFuture<Node> future) {
                            Node coordinator;
                            FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) value.responseBody();
                            Errors error = findCoordinatorResponse.error();
                            if (error == Errors.NONE) {
                                // use MAX_VALUE - node.id as the coordinator id to allow separate connections
                                // for the coordinator in the underlying network client layer
                                int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();

                                coordinator = new Node(
                                        coordinatorConnectionId,
                                        findCoordinatorResponse.data().host(),
                                        findCoordinatorResponse.data().port());
                                client.tryConnect(coordinator);
                                future.complete(coordinator);
                            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                                Assert.fail(error.message());
                            } else {
                                future.raise(error);
                            }
                        }
                    });

            client.poll(requestCoordinatorFuture);
            if (requestCoordinatorFuture.succeeded()) {
                coordinator = requestCoordinatorFuture.value();
            } else {
                throw requestCoordinatorFuture.exception();
            }

            OffsetFetchRequest.Builder requestBuilder =
                    new OffsetFetchRequest.Builder(groupId, true, topicPartitions, true);

            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> topicPartitionMetadataRequestFuture = client.send(coordinator, requestBuilder)
                    .compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetAndMetadata>>() {
                        @Override
                        public void onSuccess(ClientResponse value, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
                            OffsetFetchResponse response = (OffsetFetchResponse) value.responseBody();

                            if (response.hasError()) {
                                Errors error = response.error();
                                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                                    // just retry
                                    future.raise(error);
                                } else if (error == Errors.NOT_COORDINATOR) {
                                    // re-discover the coordinator and retry
                                    future.raise(error);
                                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                                    Assert.fail(Errors.GROUP_AUTHORIZATION_FAILED + "");
                                } else {
                                    future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                                }
                                return;
                            }

                            Set<String> unauthorizedTopics = null;
                            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
                            Set<TopicPartition> unstableTxnOffsetTopicPartitions = new HashSet<>();
                            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                                TopicPartition tp = entry.getKey();
                                OffsetFetchResponse.PartitionData partitionData = entry.getValue();
                                if (partitionData.hasError()) {
                                    Errors error = partitionData.error;
                                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                                        future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
                                        return;
                                    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                                        if (unauthorizedTopics == null) {
                                            unauthorizedTopics = new HashSet<>();
                                        }
                                        unauthorizedTopics.add(tp.topic());
                                    } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
                                        unstableTxnOffsetTopicPartitions.add(tp);
                                    } else {
                                        future.raise(new KafkaException("Unexpected error in fetch offset response for partition " +
                                                tp + ": " + error.message()));
                                        return;
                                    }
                                } else if (partitionData.offset >= 0) {
                                    // record the position with the offset (-1 indicates no committed offset to fetch);
                                    // if there's no committed offset, record as null
                                    offsets.put(tp, new OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, partitionData.metadata));
                                } else {
                                    try {
                                        HashMap<TopicPartition, OffsetSpec> offsetMap = new HashMap<>();
                                        offsetMap.put(tp, OffsetSpec.earliest());
                                        ListOffsetsResult listOffsetsResult = adminClient.listOffsets(offsetMap);
                                        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap = listOffsetsResult.all().get();
                                        ListOffsetsResult.ListOffsetsResultInfo offsetsResultInfo = topicPartitionListOffsetsResultInfoMap.get(tp);

                                        offsets.put(tp, new OffsetAndMetadata(offsetsResultInfo.offset(), offsetsResultInfo.leaderEpoch(), ""));
                                    } catch (Exception e) {
                                        Assert.fail(e.getMessage());
                                    }
                                }
                                Assert.fail("not found the topic and partition");
                            }

                            if (unauthorizedTopics != null) {
                                future.raise(new TopicAuthorizationException(unauthorizedTopics));
                            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
                                // just retry
                                future.raise(new UnstableOffsetCommitException("There are unstable offsets for the requested topic partitions"));
                            } else {
                                future.complete(offsets);
                            }
                        }

                        @Override
                        public void onFailure(RuntimeException e, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
                            super.onFailure(e, future);
                        }
                    });

            client.poll(topicPartitionMetadataRequestFuture);

            if(topicPartitionMetadataRequestFuture.succeeded()) {
                Map<TopicPartition, OffsetAndMetadata> value = topicPartitionMetadataRequestFuture.value();
                Assert.assertNotNull(value);
            }else {
                Assert.fail(topicPartitionMetadataRequestFuture.exception().getMessage());
            }

        }catch (Exception e) {
            Assert.fail(e.getMessage());
        }finally {
            if(topicExist) {
                List<String> topicToDeleted = new ArrayList<>();
                DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicToDeleted);
                deleteTopicsResult.all().get();
            }
        }
    }






--
This message was sent by Atlassian Jira
(v8.3.4#803005)