You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/11 21:20:01 UTC

[kafka] branch 3.3 updated: KAFKA-13986; Brokers should include node.id in fetches to metadata quorum (#12498)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 44229581ed8 KAFKA-13986; Brokers should include node.id in fetches to metadata quorum (#12498)
44229581ed8 is described below

commit 44229581ed8994416c41ac6584c150564185f0da
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Aug 11 14:08:37 2022 -0700

    KAFKA-13986; Brokers should include node.id in fetches to metadata quorum (#12498)
    
    Currently we do not set the replicaId in fetches from brokers to the metadata quorum. It is useful to do so since that allows us to debug replication using the `DescribeQuorum` API.
    
    Reviewers: dengziming <de...@gmail.com>, José Armando García Sancio <js...@users.noreply.github.com>
---
 core/src/main/scala/kafka/raft/RaftManager.scala          |  8 +-------
 core/src/test/java/kafka/test/ClusterInstance.java        | 13 +++++++++++++
 .../kafka/test/junit/RaftClusterInvocationContext.java    | 15 +++++++++++++++
 .../java/kafka/test/junit/ZkClusterInvocationContext.java | 13 +++++++++++++
 core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala |  8 ++++----
 .../unit/kafka/server/DescribeQuorumRequestTest.scala     |  9 +++++----
 6 files changed, 51 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index cbb9f7b89bf..a44d9d8fe01 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture
 import kafka.log.UnifiedLog
 import kafka.raft.KafkaRaftManager.RaftIoThread
 import kafka.server.{KafkaConfig, MetaProperties}
-import kafka.server.KafkaRaftServer.ControllerRole
 import kafka.utils.timer.SystemTimer
 import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
 import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
@@ -181,12 +180,7 @@ class KafkaRaftManager[T](
     val expirationTimer = new SystemTimer("raft-expiration-executor")
     val expirationService = new TimingWheelExpirationService(expirationTimer)
     val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))
-
-    val nodeId = if (config.processRoles.contains(ControllerRole)) {
-      OptionalInt.of(config.nodeId)
-    } else {
-      OptionalInt.empty()
-    }
+    val nodeId = OptionalInt.of(config.nodeId)
 
     val client = new KafkaRaftClient(
       recordSerde,
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index a7052857c36..9058508fa94 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 
 public interface ClusterInstance {
 
@@ -50,6 +51,18 @@ public interface ClusterInstance {
      */
     ClusterConfig config();
 
+    /**
+     * Return the set of all controller IDs configured for this test. For kraft, this
+     * will return only the nodes which have the "controller" role enabled in `process.roles`.
+     * For zookeeper, this will return all broker IDs since they are all eligible controllers.
+     */
+    Set<Integer> controllerIds();
+
+    /**
+     * Return the set of all broker IDs configured for this test.
+     */
+    Set<Integer> brokerIds();
+
     /**
      * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If
      * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index cef71042d3f..5cd3ec3e246 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -42,6 +42,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -192,6 +193,20 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
             return clusterConfig;
         }
 
+        @Override
+        public Set<Integer> controllerIds() {
+            return controllers()
+                .map(controllerServer -> controllerServer.config().nodeId())
+                .collect(Collectors.toSet());
+        }
+
+        @Override
+        public Set<Integer> brokerIds() {
+            return brokers()
+                .map(brokerServer -> brokerServer.config().nodeId())
+                .collect(Collectors.toSet());
+        }
+
         @Override
         public KafkaClusterTestKit getUnderlying() {
             return clusterReference.get();
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index d8375b01279..18a85e2d7bf 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -257,6 +258,18 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
             return config;
         }
 
+        @Override
+        public Set<Integer> controllerIds() {
+            return brokerIds();
+        }
+
+        @Override
+        public Set<Integer> brokerIds() {
+            return servers()
+                .map(brokerServer -> brokerServer.config().nodeId())
+                .collect(Collectors.toSet());
+        }
+
         @Override
         public IntegrationTestHarness getUnderlying() {
             return clusterReference.get();
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index f8fac503d6e..9d7a93db94c 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -82,23 +82,23 @@ class RaftManagerTest {
   }
 
   @Test
-  def testSentinelNodeIdIfBrokerRoleOnly(): Unit = {
+  def testNodeIdPresentIfBrokerRoleOnly(): Unit = {
     val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "broker", "1")
-    assertFalse(raftManager.client.nodeId.isPresent)
+    assertEquals(1, raftManager.client.nodeId.getAsInt)
     raftManager.shutdown()
   }
 
   @Test
   def testNodeIdPresentIfControllerRoleOnly(): Unit = {
     val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller", "1")
-    assertTrue(raftManager.client.nodeId.getAsInt == 1)
+    assertEquals(1, raftManager.client.nodeId.getAsInt)
     raftManager.shutdown()
   }
 
   @Test
   def testNodeIdPresentIfColocated(): Unit = {
     val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller,broker", "1")
-    assertTrue(raftManager.client.nodeId.getAsInt == 1)
+    assertEquals(1, raftManager.client.nodeId.getAsInt)
     raftManager.shutdown()
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index f8da00f10e8..b53004b2eaf 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -80,11 +80,12 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
       assertTrue(leaderState.logEndOffset > 0)
 
       val voterData = partitionData.currentVoters.asScala
+      assertEquals(cluster.controllerIds().asScala, voterData.map(_.replicaId).toSet);
+
       val observerData = partitionData.observers.asScala
-      assertEquals(1, voterData.size)
-      assertEquals(0, observerData.size)
-      voterData.foreach { state =>
-        assertTrue(0 < state.replicaId)
+      assertEquals(cluster.brokerIds().asScala, observerData.map(_.replicaId).toSet);
+
+      (voterData ++ observerData).foreach { state =>
         assertTrue(0 < state.logEndOffset)
         assertEquals(-1, state.lastFetchTimestamp)
         assertEquals(-1, state.lastCaughtUpTimestamp)