You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "chenchao (Jira)" <ji...@apache.org> on 2021/09/01 07:25:00 UTC
[jira] [Commented] (KAFKA-10208)
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.OffsetFetchResponseHandler
return null when Broker unexpectedly doesn't support requireStable flag on
version while not any information
[ https://issues.apache.org/jira/browse/KAFKA-10208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17407913#comment-17407913 ]
chenchao commented on KAFKA-10208:
----------------------------------
should format your code
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.OffsetFetchResponseHandler return null when Broker unexpectedly doesn't support requireStable flag on version while not any information
> -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-10208
> URL: https://issues.apache.org/jira/browse/KAFKA-10208
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 2.7.0
> Reporter: lqjacklee
> Priority: Major
>
> When the 2.7.0 client try to request the broker whose version is 2.3.0, the OffsetAndMetadata will be null and miss the Key information.
> I have create the test case as below :
> @Test
> public void testCreateTopicAndCheckTheOffsite() throws ExecutionException, InterruptedException {
> String topicName = UUID.randomUUID().toString();
> String groupId = "DEMO_" + topicName;
> final Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, groupId);
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> String serializer = StringSerializer.class.getName();
> String deserializer = StringDeserializer.class.getName();
> props.put("auto.offset.reset", "latest");
> props.put("session.timeout.ms", "30000");
> props.put("key.deserializer", deserializer);
> props.put("value.deserializer", deserializer);
> props.put("key.serializer", serializer);
> props.put("value.serializer", serializer);
> props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
> AdminClient adminClient = AdminClient.create(props);
> boolean topicExist = false;
> try {
> NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
> CreateTopicsOptions createTopicsOptions = new CreateTopicsOptions();
> createTopicsOptions.timeoutMs(3000000);
> final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic), createTopicsOptions);
> createTopicsResult.values().get(topicName).get();
> }catch (TopicExistsException e) {
> topicExist = true;
> }
> try {
> List<TopicPartition> topicPartitions = new ArrayList<>();
> KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(props);
> Field kafkaClientField = kafkaConsumer.getClass().getDeclaredField("client");
> kafkaClientField.setAccessible(true);
> ConsumerNetworkClient client = (ConsumerNetworkClient) kafkaClientField.get(kafkaConsumer);
> FindCoordinatorRequest.Builder findCoordinatorRequest =
> new FindCoordinatorRequest.Builder(
> new FindCoordinatorRequestData()
> .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
> .setKey(groupId));
> Node node = client.leastLoadedNode();
> Node coordinator;
> RequestFuture<Node> requestCoordinatorFuture = client.send(node, findCoordinatorRequest)
> .compose(new RequestFutureAdapter<ClientResponse, Node>() {
> @Override
> public void onFailure(RuntimeException e, RequestFuture<Node> future) {
> super.onFailure(e, future);
> }
> @Override
> public void onSuccess(ClientResponse value, RequestFuture<Node> future) {
> Node coordinator;
> FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) value.responseBody();
> Errors error = findCoordinatorResponse.error();
> if (error == Errors.NONE) {
> // use MAX_VALUE - node.id as the coordinator id to allow separate connections
> // for the coordinator in the underlying network client layer
> int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
> coordinator = new Node(
> coordinatorConnectionId,
> findCoordinatorResponse.data().host(),
> findCoordinatorResponse.data().port());
> client.tryConnect(coordinator);
> future.complete(coordinator);
> } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
> Assert.fail(error.message());
> } else {
> future.raise(error);
> }
> }
> });
> client.poll(requestCoordinatorFuture);
> if (requestCoordinatorFuture.succeeded()) {
> coordinator = requestCoordinatorFuture.value();
> } else {
> throw requestCoordinatorFuture.exception();
> }
> OffsetFetchRequest.Builder requestBuilder =
> new OffsetFetchRequest.Builder(groupId, true, topicPartitions, true);
> RequestFuture<Map<TopicPartition, OffsetAndMetadata>> topicPartitionMetadataRequestFuture = client.send(coordinator, requestBuilder)
> .compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetAndMetadata>>() {
> @Override
> public void onSuccess(ClientResponse value, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
> OffsetFetchResponse response = (OffsetFetchResponse) value.responseBody();
> if (response.hasError()) {
> Errors error = response.error();
> if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
> // just retry
> future.raise(error);
> } else if (error == Errors.NOT_COORDINATOR) {
> // re-discover the coordinator and retry
> future.raise(error);
> } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
> Assert.fail(Errors.GROUP_AUTHORIZATION_FAILED + "");
> } else {
> future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
> }
> return;
> }
> Set<String> unauthorizedTopics = null;
> Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
> Set<TopicPartition> unstableTxnOffsetTopicPartitions = new HashSet<>();
> for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
> TopicPartition tp = entry.getKey();
> OffsetFetchResponse.PartitionData partitionData = entry.getValue();
> if (partitionData.hasError()) {
> Errors error = partitionData.error;
> if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
> future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
> return;
> } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
> if (unauthorizedTopics == null) {
> unauthorizedTopics = new HashSet<>();
> }
> unauthorizedTopics.add(tp.topic());
> } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
> unstableTxnOffsetTopicPartitions.add(tp);
> } else {
> future.raise(new KafkaException("Unexpected error in fetch offset response for partition " +
> tp + ": " + error.message()));
> return;
> }
> } else if (partitionData.offset >= 0) {
> // record the position with the offset (-1 indicates no committed offset to fetch);
> // if there's no committed offset, record as null
> offsets.put(tp, new OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, partitionData.metadata));
> } else {
> try {
> HashMap<TopicPartition, OffsetSpec> offsetMap = new HashMap<>();
> offsetMap.put(tp, OffsetSpec.earliest());
> ListOffsetsResult listOffsetsResult = adminClient.listOffsets(offsetMap);
> Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap = listOffsetsResult.all().get();
> ListOffsetsResult.ListOffsetsResultInfo offsetsResultInfo = topicPartitionListOffsetsResultInfoMap.get(tp);
> offsets.put(tp, new OffsetAndMetadata(offsetsResultInfo.offset(), offsetsResultInfo.leaderEpoch(), ""));
> } catch (Exception e) {
> Assert.fail(e.getMessage());
> }
> }
> Assert.fail("not found the topic and partition");
> }
> if (unauthorizedTopics != null) {
> future.raise(new TopicAuthorizationException(unauthorizedTopics));
> } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
> // just retry
> future.raise(new UnstableOffsetCommitException("There are unstable offsets for the requested topic partitions"));
> } else {
> future.complete(offsets);
> }
> }
> @Override
> public void onFailure(RuntimeException e, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
> super.onFailure(e, future);
> }
> });
> client.poll(topicPartitionMetadataRequestFuture);
> if(topicPartitionMetadataRequestFuture.succeeded()) {
> Map<TopicPartition, OffsetAndMetadata> value = topicPartitionMetadataRequestFuture.value();
> Assert.assertNotNull(value);
> }else {
> Assert.fail(topicPartitionMetadataRequestFuture.exception().getMessage());
> }
> }catch (Exception e) {
> Assert.fail(e.getMessage());
> }finally {
> if(topicExist) {
> List<String> topicToDeleted = new ArrayList<>();
> DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicToDeleted);
> deleteTopicsResult.all().get();
> }
> }
> }
--
This message was sent by Atlassian Jira
(v8.3.4#803005)