You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/08/05 15:26:20 UTC
[kafka] branch 3.0 updated: KAFKA-13168: KRaft observers should not
have a replica id (#11178)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 765d200 KAFKA-13168: KRaft observers should not have a replica id (#11178)
765d200 is described below
commit 765d2006a32d7aaead676960e441b4c00860e0ab
Author: Ryan Dielhenn <rd...@confluent.io>
AuthorDate: Thu Aug 5 08:18:41 2021 -0700
KAFKA-13168: KRaft observers should not have a replica id (#11178)
---
core/src/main/scala/kafka/raft/RaftManager.scala | 9 +++-
.../main/scala/kafka/tools/TestRaftServer.scala | 2 +-
.../scala/unit/kafka/raft/RaftManagerTest.scala | 63 ++++++++++++++++++++++
3 files changed, 72 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 7e63a25..806b62d 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture
import kafka.log.Log
import kafka.raft.KafkaRaftManager.RaftIoThread
import kafka.server.{KafkaConfig, MetaProperties}
+import kafka.server.KafkaRaftServer.ControllerRole
import kafka.utils.timer.SystemTimer
import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
@@ -180,6 +181,12 @@ class KafkaRaftManager[T](
val expirationService = new TimingWheelExpirationService(expirationTimer)
val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))
+ val nodeId = if (config.processRoles.contains(ControllerRole)) {
+ OptionalInt.of(config.nodeId)
+ } else {
+ OptionalInt.empty()
+ }
+
val client = new KafkaRaftClient(
recordSerde,
netChannel,
@@ -190,7 +197,7 @@ class KafkaRaftManager[T](
expirationService,
logContext,
metaProperties.clusterId,
- OptionalInt.of(config.nodeId),
+ nodeId,
raftConfig
)
client.initialize()
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 774805e..5099138 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -290,7 +290,7 @@ object TestRaftServer extends Logging {
}
}
- private class ByteArraySerde extends RecordSerde[Array[Byte]] {
+ class ByteArraySerde extends RecordSerde[Array[Byte]] {
override def recordSize(data: Array[Byte], serializationCache: ObjectSerializationCache): Int = {
data.length
}
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 3afb75b..40256ae 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -17,15 +17,78 @@
package kafka.raft
import java.util.concurrent.CompletableFuture
+import java.util.Properties
import kafka.raft.KafkaRaftManager.RaftIoThread
+import kafka.server.{KafkaConfig, MetaProperties}
+import kafka.tools.TestRaftServer.ByteArraySerde
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
import org.apache.kafka.raft.KafkaRaftClient
+import org.apache.kafka.raft.RaftConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito._
class RaftManagerTest {
+ private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
+ def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = {
+ val props = new Properties
+ props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
+ props.setProperty(KafkaConfig.NodeIdProp, nodeId)
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+ props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
+ if (processRoles.contains("broker"))
+ props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+ new KafkaConfig(props)
+ }
+
+ val config = configWithProcessRolesAndNodeId(processRoles, nodeId)
+ val topicId = new Uuid(0L, 2L)
+ val metaProperties = MetaProperties(
+ clusterId = Uuid.randomUuid.toString,
+ nodeId = config.nodeId
+ )
+
+ new KafkaRaftManager[Array[Byte]](
+ metaProperties,
+ config,
+ new ByteArraySerde,
+ new TopicPartition("__taft_id_test", 0),
+ topicId,
+ Time.SYSTEM,
+ new Metrics(Time.SYSTEM),
+ Option.empty,
+ CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters))
+ )
+ }
+
+ @Test
+ def testSentinelNodeIdIfBrokerRoleOnly(): Unit = {
+ val raftManager = instantiateRaftManagerWithConfigs("broker", "1")
+ assertFalse(raftManager.client.nodeId.isPresent)
+ raftManager.shutdown()
+ }
+
+ @Test
+ def testNodeIdPresentIfControllerRoleOnly(): Unit = {
+ val raftManager = instantiateRaftManagerWithConfigs("controller", "1")
+ assertTrue(raftManager.client.nodeId.getAsInt == 1)
+ raftManager.shutdown()
+ }
+
+ @Test
+ def testNodeIdPresentIfColocated(): Unit = {
+ val raftManager = instantiateRaftManagerWithConfigs("controller,broker", "1")
+ assertTrue(raftManager.client.nodeId.getAsInt == 1)
+ raftManager.shutdown()
+ }
+
@Test
def testShutdownIoThread(): Unit = {
val raftClient = mock(classOf[KafkaRaftClient[String]])