You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/08/05 15:26:20 UTC

[kafka] branch 3.0 updated: KAFKA-13168: KRaft observers should not have a replica id (#11178)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 765d200  KAFKA-13168: KRaft observers should not have a replica id (#11178)
765d200 is described below

commit 765d2006a32d7aaead676960e441b4c00860e0ab
Author: Ryan Dielhenn <rd...@confluent.io>
AuthorDate: Thu Aug 5 08:18:41 2021 -0700

    KAFKA-13168: KRaft observers should not have a replica id (#11178)
---
 core/src/main/scala/kafka/raft/RaftManager.scala   |  9 +++-
 .../main/scala/kafka/tools/TestRaftServer.scala    |  2 +-
 .../scala/unit/kafka/raft/RaftManagerTest.scala    | 63 ++++++++++++++++++++++
 3 files changed, 72 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 7e63a25..806b62d 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture
 import kafka.log.Log
 import kafka.raft.KafkaRaftManager.RaftIoThread
 import kafka.server.{KafkaConfig, MetaProperties}
+import kafka.server.KafkaRaftServer.ControllerRole
 import kafka.utils.timer.SystemTimer
 import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
 import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
@@ -180,6 +181,12 @@ class KafkaRaftManager[T](
     val expirationService = new TimingWheelExpirationService(expirationTimer)
     val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))
 
+    val nodeId = if (config.processRoles.contains(ControllerRole)) {
+      OptionalInt.of(config.nodeId)
+    } else {
+      OptionalInt.empty()
+    }
+
     val client = new KafkaRaftClient(
       recordSerde,
       netChannel,
@@ -190,7 +197,7 @@ class KafkaRaftManager[T](
       expirationService,
       logContext,
       metaProperties.clusterId,
-      OptionalInt.of(config.nodeId),
+      nodeId,
       raftConfig
     )
     client.initialize()
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 774805e..5099138 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -290,7 +290,7 @@ object TestRaftServer extends Logging {
     }
   }
 
-  private class ByteArraySerde extends RecordSerde[Array[Byte]] {
+  class ByteArraySerde extends RecordSerde[Array[Byte]] {
     override def recordSize(data: Array[Byte], serializationCache: ObjectSerializationCache): Int = {
       data.length
     }
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 3afb75b..40256ae 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -17,15 +17,78 @@
 package kafka.raft
 
 import java.util.concurrent.CompletableFuture
+import java.util.Properties
 
 import kafka.raft.KafkaRaftManager.RaftIoThread
+import kafka.server.{KafkaConfig, MetaProperties}
+import kafka.tools.TestRaftServer.ByteArraySerde
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.raft.KafkaRaftClient
+import org.apache.kafka.raft.RaftConfig
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
 
 class RaftManagerTest {
 
+  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
+    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = {
+      val props = new Properties
+      props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
+      props.setProperty(KafkaConfig.NodeIdProp, nodeId)
+      props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
+      props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+      props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
+      if (processRoles.contains("broker"))
+        props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
+        props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+      new KafkaConfig(props)
+    }
+
+    val config = configWithProcessRolesAndNodeId(processRoles, nodeId)
+    val topicId = new Uuid(0L, 2L)
+    val metaProperties = MetaProperties(
+      clusterId = Uuid.randomUuid.toString,
+      nodeId = config.nodeId
+    )
+
+    new KafkaRaftManager[Array[Byte]](
+      metaProperties,
+      config,
+      new ByteArraySerde,
+      new TopicPartition("__taft_id_test", 0),
+      topicId,
+      Time.SYSTEM,
+      new Metrics(Time.SYSTEM),
+      Option.empty,
+      CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters))
+    )
+  }
+
+  @Test
+  def testSentinelNodeIdIfBrokerRoleOnly(): Unit = {
+    val raftManager = instantiateRaftManagerWithConfigs("broker", "1")
+    assertFalse(raftManager.client.nodeId.isPresent)
+    raftManager.shutdown()
+  }
+
+  @Test
+  def testNodeIdPresentIfControllerRoleOnly(): Unit = {
+    val raftManager = instantiateRaftManagerWithConfigs("controller", "1")
+    assertTrue(raftManager.client.nodeId.getAsInt == 1)
+    raftManager.shutdown()
+  }
+
+  @Test
+  def testNodeIdPresentIfColocated(): Unit = {
+    val raftManager = instantiateRaftManagerWithConfigs("controller,broker", "1")
+    assertTrue(raftManager.client.nodeId.getAsInt == 1)
+    raftManager.shutdown()
+  }
+
   @Test
   def testShutdownIoThread(): Unit = {
     val raftClient = mock(classOf[KafkaRaftClient[String]])