You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/06 00:03:54 UTC

[GitHub] [kafka] artemlivshits opened a new pull request, #12956: Kafka 14379 client preferred read replica

artemlivshits opened a new pull request, #12956:
URL: https://github.com/apache/kafka/pull/12956

   KAFKA-14379: consumer should refresh preferred read replica on update metadata
       
   The consumer (fetcher) used to refresh the preferred read replica on
   three conditions:
       
   1. the consumer receives an OFFSET_OUT_OF_RANGE error
   2. the follower does not exist in the client's metadata (i.e., offline)
   3. after metadata.max.age.ms (5 min default)
       
   For other errors, it will continue to reach to the possibly unavailable
   follower and only after 5 minutes will it refresh the preferred read
   replica and go back to the leader.
       
   Another problem is that the client might have stale metadata and not
   send fetches to preferred replica, even after the leader redirects to
   the preferred replica.
       
   A specific example is when a partition is reassigned. the consumer will
   get NOT_LEADER_OR_FOLLOWER which triggers a metadata update but the
   preferred read replica will not be refreshed as the follower is still
   online. it will continue to reach out to the old follower until the
   preferred read replica expires.
       
   The consumer can instead refresh its preferred read replica whenever it
   makes a metadata update request, so when the consumer receives i.e.
   NOT_LEADER_OR_FOLLOWER it can find the new preferred read replica without
   waiting for the expiration.
       
   Generally, we will rely on the leader to choose the correct preferred
   read replica and have the consumer fail fast (clear preferred read replica
   cache) on errors and reach out to the leader.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12956: Kafka 14379 client preferred read replica

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on code in PR #12956:
URL: https://github.com/apache/kafka/pull/12956#discussion_r1040244300


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1154,7 +1155,9 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren
             } else {
                 log.trace("Not fetching from {} for partition {} since it is marked offline or is missing from our metadata," +
                           " using the leader instead.", nodeId, partition);
-                subscriptions.clearPreferredReadReplica(partition);
+                // Note that this condition may happen due to stale metadata, so we clear preferred replica and
+                // refresh metadata.
+                requestMetadataUpdate(partition);

Review Comment:
   This is new logic in addition to https://github.com/apache/kafka/pull/12897.  I'll add a unit test later.



##########
clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java:
##########
@@ -208,10 +209,10 @@ public static MetadataResponse metadataUpdateWith(final String clusterId,
             for (int i = 0; i < numPartitions; i++) {
                 TopicPartition tp = new TopicPartition(topic, i);
                 Node leader = nodes.get(i % nodes.size());
-                List<Integer> replicaIds = Collections.singletonList(leader.id());
+                List<Integer> replicaIds = nodes.stream().map(Node::id).collect(Collectors.toList());

Review Comment:
   From @hachikuji: Leaving the replication factor as implicit seems less than ideal. Perhaps we could make it an explicit argument?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on code in PR #12956:
URL: https://github.com/apache/kafka/pull/12956#discussion_r1043896712


##########
clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java:
##########
@@ -208,10 +209,10 @@ public static MetadataResponse metadataUpdateWith(final String clusterId,
             for (int i = 0; i < numPartitions; i++) {
                 TopicPartition tp = new TopicPartition(topic, i);
                 Node leader = nodes.get(i % nodes.size());
-                List<Integer> replicaIds = Collections.singletonList(leader.id());
+                List<Integer> replicaIds = nodes.stream().map(Node::id).collect(Collectors.toList());
                 partitionMetadata.add(partitionSupplier.supply(
                         Errors.NONE, tp, Optional.of(leader.id()), Optional.ofNullable(epochSupplier.apply(tp)),
-                        replicaIds, replicaIds, replicaIds));
+                        replicaIds, replicaIds, Collections.emptyList()));

Review Comment:
   Not sure about this change, looks like a re-base mistake.  Will take a closer look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #12956:
URL: https://github.com/apache/kafka/pull/12956#issuecomment-1344961172

   Note there are some failures on `FetcherTest` in the builds above which seem related to this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on code in PR #12956:
URL: https://github.com/apache/kafka/pull/12956#discussion_r1044843523


##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -103,44 +105,89 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
 
     TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
 
+    assertEquals(1, getPreferredReplica)

Review Comment:
   For the new unit test I added a function to get preferred replica, so all explicit code to build and send fetch request got moved into the new function.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1154,7 +1155,9 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren
             } else {
                 log.trace("Not fetching from {} for partition {} since it is marked offline or is missing from our metadata," +
                           " using the leader instead.", nodeId, partition);
-                subscriptions.clearPreferredReadReplica(partition);
+                // Note that this condition may happen due to stale metadata, so we clear preferred replica and
+                // refresh metadata.
+                requestMetadataUpdate(partition);

Review Comment:
   Added unit test.  Verified that it failed without this change and passed with this change.



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -103,44 +105,89 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
 
     TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
 
+    assertEquals(1, getPreferredReplica)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()
+    val topicPartition = new TopicPartition(topic, 0)
+    TestUtils.waitUntilTrue(() => {
+      val endpoints = brokers(leaderBrokerId).metadataCache.getPartitionReplicaEndpoints(topicPartition, listenerName)
+      !endpoints.contains(followerBrokerId)
+    }, "follower is still reachable.")
+
+    assertEquals(-1, getPreferredReplica)
+  }
+
+  @Test
+  def testFetchFromFollowerWithRoll(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    // Create consumer with client.rack = follower id.
+    val consumerProps = new Properties
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumerProps.put(ConsumerConfig.CLIENT_RACK_CONFIG, followerBrokerId.toString)
+    val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer)
+    try {
+      consumer.subscribe(List(topic).asJava)
+
+      // Wait until preferred replica is set to follower.
+      TestUtils.waitUntilTrue(() => {
+        getPreferredReplica == 1
+      }, "Preferred replica is not set")
+
+      // Produce and consume.
+      TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
+      TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
+
+      // Shutdown follower, produce and consume should work.
+      brokers(followerBrokerId).shutdown()
+      TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
+      TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
+
+      // Start the follower and wait until preferred replica is set to follower.
+      brokers(followerBrokerId).startup()
+      TestUtils.waitUntilTrue(() => {
+        getPreferredReplica == 1
+      }, "Preferred replica is not set")
+
+      // Produce and consume should still work.
+      TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
+      TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
+    } finally {
+      consumer.close()
+    }
+  }
+
+  private def getPreferredReplica: Int = {

Review Comment:
   Previously explicit logic moved from a unit test to this utility function.



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -103,44 +105,89 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
 
     TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
 
+    assertEquals(1, getPreferredReplica)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()
+    val topicPartition = new TopicPartition(topic, 0)
+    TestUtils.waitUntilTrue(() => {
+      val endpoints = brokers(leaderBrokerId).metadataCache.getPartitionReplicaEndpoints(topicPartition, listenerName)
+      !endpoints.contains(followerBrokerId)
+    }, "follower is still reachable.")

Review Comment:
   This is actually not changed.



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -103,44 +105,89 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
 
     TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
 
+    assertEquals(1, getPreferredReplica)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()
+    val topicPartition = new TopicPartition(topic, 0)
+    TestUtils.waitUntilTrue(() => {
+      val endpoints = brokers(leaderBrokerId).metadataCache.getPartitionReplicaEndpoints(topicPartition, listenerName)
+      !endpoints.contains(followerBrokerId)
+    }, "follower is still reachable.")
+
+    assertEquals(-1, getPreferredReplica)
+  }
+
+  @Test
+  def testFetchFromFollowerWithRoll(): Unit = {

Review Comment:
   This is the new unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12956:
URL: https://github.com/apache/kafka/pull/12956#discussion_r1045029878


##########
clients/src/main/java/org/apache/kafka/common/Cluster.java:
##########
@@ -253,7 +253,11 @@ public Node nodeById(int id) {
     public Optional<Node> nodeIfOnline(TopicPartition partition, int id) {
         Node node = nodeById(id);
         PartitionInfo partitionInfo = partition(partition);
-        if (node != null && partitionInfo != null && !Arrays.asList(partitionInfo.offlineReplicas()).contains(node)) {
+
+        if (node != null && partitionInfo != null &&
+            !Arrays.asList(partitionInfo.offlineReplicas()).contains(node) &&

Review Comment:
   I think that it is better to keep the fix here as well. It is misleading otherwise. I agree about the helper though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12956:
URL: https://github.com/apache/kafka/pull/12956#issuecomment-1346125696

   Merged to trunk, 3.4 and 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12956:
URL: https://github.com/apache/kafka/pull/12956#discussion_r1044913099


##########
clients/src/main/java/org/apache/kafka/common/Cluster.java:
##########
@@ -253,7 +253,11 @@ public Node nodeById(int id) {
     public Optional<Node> nodeIfOnline(TopicPartition partition, int id) {
         Node node = nodeById(id);
         PartitionInfo partitionInfo = partition(partition);
-        if (node != null && partitionInfo != null && !Arrays.asList(partitionInfo.offlineReplicas()).contains(node)) {
+
+        if (node != null && partitionInfo != null &&
+            !Arrays.asList(partitionInfo.offlineReplicas()).contains(node) &&

Review Comment:
   We probably don't need to fix it here, but maybe we should create a little helper to iterate the array instead of creating a new list.
   



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -103,44 +105,89 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
 
     TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
 
+    assertEquals(1, getPreferredReplica)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms

Review Comment:
   nit: comment seems not relevant to this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on code in PR #12956:
URL: https://github.com/apache/kafka/pull/12956#discussion_r1045132016


##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -103,44 +105,89 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
 
     TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
 
+    assertEquals(1, getPreferredReplica)
+
+    // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms

Review Comment:
   Removed.



##########
clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java:
##########
@@ -208,10 +209,10 @@ public static MetadataResponse metadataUpdateWith(final String clusterId,
             for (int i = 0; i < numPartitions; i++) {
                 TopicPartition tp = new TopicPartition(topic, i);
                 Node leader = nodes.get(i % nodes.size());
-                List<Integer> replicaIds = Collections.singletonList(leader.id());
+                List<Integer> replicaIds = nodes.stream().map(Node::id).collect(Collectors.toList());
                 partitionMetadata.add(partitionSupplier.supply(
                         Errors.NONE, tp, Optional.of(leader.id()), Optional.ofNullable(epochSupplier.apply(tp)),
-                        replicaIds, replicaIds, replicaIds));
+                        replicaIds, replicaIds, Collections.emptyList()));

Review Comment:
   It's needed, otherwise, all replicas are marked as offline.



##########
clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java:
##########
@@ -208,10 +233,10 @@ public static MetadataResponse metadataUpdateWith(final String clusterId,
             for (int i = 0; i < numPartitions; i++) {
                 TopicPartition tp = new TopicPartition(topic, i);
                 Node leader = nodes.get(i % nodes.size());
-                List<Integer> replicaIds = Collections.singletonList(leader.id());
+                List<Integer> replicaIds = leaderOnly ? Collections.singletonList(leader.id()) : nodes.stream().map(Node::id).collect(Collectors.toList());

Review Comment:
   Updated to use an explicit flag to use all replicas vs just the leader.  @hachikuji 



##########
clients/src/main/java/org/apache/kafka/common/Cluster.java:
##########
@@ -253,7 +253,11 @@ public Node nodeById(int id) {
     public Optional<Node> nodeIfOnline(TopicPartition partition, int id) {
         Node node = nodeById(id);
         PartitionInfo partitionInfo = partition(partition);
-        if (node != null && partitionInfo != null && !Arrays.asList(partitionInfo.offlineReplicas()).contains(node)) {
+
+        if (node != null && partitionInfo != null &&
+            !Arrays.asList(partitionInfo.offlineReplicas()).contains(node) &&

Review Comment:
   Looking at the source code, asList does a shallow copy (i.e. just creates a new adaptor on the same data, so it's O(1)), so it shouldn't be too much overhead.  Let me know if you still want to create helper functions, I'll file a bug (if we're going to do that, then I think we should do a generic library of functions to be used everywhere).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12956:
URL: https://github.com/apache/kafka/pull/12956#discussion_r1045547158


##########
clients/src/main/java/org/apache/kafka/common/Cluster.java:
##########
@@ -253,7 +253,11 @@ public Node nodeById(int id) {
     public Optional<Node> nodeIfOnline(TopicPartition partition, int id) {
         Node node = nodeById(id);
         PartitionInfo partitionInfo = partition(partition);
-        if (node != null && partitionInfo != null && !Arrays.asList(partitionInfo.offlineReplicas()).contains(node)) {
+
+        if (node != null && partitionInfo != null &&
+            !Arrays.asList(partitionInfo.offlineReplicas()).contains(node) &&

Review Comment:
   Thanks for the clarification. It is actually not too bad like this. We can merge like this and follow-up if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #12956: KAFKA-14379: consumer should refresh preferred read replica on update metadata

Posted by GitBox <gi...@apache.org>.
dajac merged PR #12956:
URL: https://github.com/apache/kafka/pull/12956


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org