You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/11 21:20:01 UTC
[kafka] branch 3.3 updated: KAFKA-13986; Brokers should include node.id in fetches to metadata quorum (#12498)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 44229581ed8 KAFKA-13986; Brokers should include node.id in fetches to metadata quorum (#12498)
44229581ed8 is described below
commit 44229581ed8994416c41ac6584c150564185f0da
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Aug 11 14:08:37 2022 -0700
KAFKA-13986; Brokers should include node.id in fetches to metadata quorum (#12498)
Currently we do not set the replicaId in fetches from brokers to the metadata quorum. It is useful to do so since that allows us to debug replication using the `DescribeQuorum` API.
Reviewers: dengziming <de...@gmail.com>, José Armando García Sancio <js...@users.noreply.github.com>
---
core/src/main/scala/kafka/raft/RaftManager.scala | 8 +-------
core/src/test/java/kafka/test/ClusterInstance.java | 13 +++++++++++++
.../kafka/test/junit/RaftClusterInvocationContext.java | 15 +++++++++++++++
.../java/kafka/test/junit/ZkClusterInvocationContext.java | 13 +++++++++++++
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala | 8 ++++----
.../unit/kafka/server/DescribeQuorumRequestTest.scala | 9 +++++----
6 files changed, 51 insertions(+), 15 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index cbb9f7b89bf..a44d9d8fe01 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture
import kafka.log.UnifiedLog
import kafka.raft.KafkaRaftManager.RaftIoThread
import kafka.server.{KafkaConfig, MetaProperties}
-import kafka.server.KafkaRaftServer.ControllerRole
import kafka.utils.timer.SystemTimer
import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
@@ -181,12 +180,7 @@ class KafkaRaftManager[T](
val expirationTimer = new SystemTimer("raft-expiration-executor")
val expirationService = new TimingWheelExpirationService(expirationTimer)
val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))
-
- val nodeId = if (config.processRoles.contains(ControllerRole)) {
- OptionalInt.of(config.nodeId)
- } else {
- OptionalInt.empty()
- }
+ val nodeId = OptionalInt.of(config.nodeId)
val client = new KafkaRaftClient(
recordSerde,
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index a7052857c36..9058508fa94 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
public interface ClusterInstance {
@@ -50,6 +51,18 @@ public interface ClusterInstance {
*/
ClusterConfig config();
+ /**
+ * Return the set of all controller IDs configured for this test. For kraft, this
+ * will return only the nodes which have the "controller" role enabled in `process.roles`.
+ * For zookeeper, this will return all broker IDs since they are all eligible controllers.
+ */
+ Set<Integer> controllerIds();
+
+ /**
+ * Return the set of all broker IDs configured for this test.
+ */
+ Set<Integer> brokerIds();
+
/**
* The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If
* unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index cef71042d3f..5cd3ec3e246 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -42,6 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -192,6 +193,20 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
return clusterConfig;
}
+ @Override
+ public Set<Integer> controllerIds() {
+ return controllers()
+ .map(controllerServer -> controllerServer.config().nodeId())
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<Integer> brokerIds() {
+ return brokers()
+ .map(brokerServer -> brokerServer.config().nodeId())
+ .collect(Collectors.toSet());
+ }
+
@Override
public KafkaClusterTestKit getUnderlying() {
return clusterReference.get();
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index d8375b01279..18a85e2d7bf 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -257,6 +258,18 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
return config;
}
+ @Override
+ public Set<Integer> controllerIds() {
+ return brokerIds();
+ }
+
+ @Override
+ public Set<Integer> brokerIds() {
+ return servers()
+ .map(brokerServer -> brokerServer.config().nodeId())
+ .collect(Collectors.toSet());
+ }
+
@Override
public IntegrationTestHarness getUnderlying() {
return clusterReference.get();
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index f8fac503d6e..9d7a93db94c 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -82,23 +82,23 @@ class RaftManagerTest {
}
@Test
- def testSentinelNodeIdIfBrokerRoleOnly(): Unit = {
+ def testNodeIdPresentIfBrokerRoleOnly(): Unit = {
val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "broker", "1")
- assertFalse(raftManager.client.nodeId.isPresent)
+ assertEquals(1, raftManager.client.nodeId.getAsInt)
raftManager.shutdown()
}
@Test
def testNodeIdPresentIfControllerRoleOnly(): Unit = {
val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller", "1")
- assertTrue(raftManager.client.nodeId.getAsInt == 1)
+ assertEquals(1, raftManager.client.nodeId.getAsInt)
raftManager.shutdown()
}
@Test
def testNodeIdPresentIfColocated(): Unit = {
val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller,broker", "1")
- assertTrue(raftManager.client.nodeId.getAsInt == 1)
+ assertEquals(1, raftManager.client.nodeId.getAsInt)
raftManager.shutdown()
}
diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index f8da00f10e8..b53004b2eaf 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -80,11 +80,12 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
assertTrue(leaderState.logEndOffset > 0)
val voterData = partitionData.currentVoters.asScala
+ assertEquals(cluster.controllerIds().asScala, voterData.map(_.replicaId).toSet);
+
val observerData = partitionData.observers.asScala
- assertEquals(1, voterData.size)
- assertEquals(0, observerData.size)
- voterData.foreach { state =>
- assertTrue(0 < state.replicaId)
+ assertEquals(cluster.brokerIds().asScala, observerData.map(_.replicaId).toSet);
+
+ (voterData ++ observerData).foreach { state =>
assertTrue(0 < state.logEndOffset)
assertEquals(-1, state.lastFetchTimestamp)
assertEquals(-1, state.lastCaughtUpTimestamp)