You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/05/25 23:09:18 UTC

[kafka] branch trunk updated: KAFKA-13858; Kraft should not shutdown metadata listener until controller shutdown is finished (#12187)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 76477ffd2d KAFKA-13858; Kraft should not shutdown metadata listener until controller shutdown is finished (#12187)
76477ffd2d is described below

commit 76477ffd2d8a84803c57839fab6e0707045cd8f5
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu May 26 01:09:01 2022 +0200

    KAFKA-13858; Kraft should not shutdown metadata listener until controller shutdown is finished (#12187)
    
    When the kraft broker begins controlled shutdown, it immediately disables the metadata listener. This means that metadata changes as part of the controlled shutdown do not get sent to the respective components. For partitions that the broker is follower of, that is what we want. It prevents the follower from being able to rejoin the ISR while still shutting down. But for partitions that the broker is leading, it means the leader will remain active until controlled shutdown finishes an [...]
    
    This PR revises the controlled shutdown procedure as follow:
    * The broker signals to the replica manager that it is about to start the controlled shutdown.
    * The broker requests a controlled shutdown to the controller.
    * The controller moves leaders off from the broker, removes the broker from any ISR that it is a member of, and writes those changes to the metadata log.
    * When the broker receives a partition metadata change, it looks if it is in the ISR. If it is, it updates the partition as usual. If it is not or if there is no leader defined--as would be the case if the broker was the last member of the ISR--it stops the fetcher/replica. This basically stops all the partitions for which the broker was part of their ISR.
    
    When the broker is a replica of a partition but it is not in the ISR, the controller does not do anything. The leader epoch is not bumped. In this particular case, the follower will continue to run until the replica manager shuts down. In this time, the replica could become in-sync and the leader could try to bring it back to the ISR. This remaining issue will be addressed separately.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |   5 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  14 ++-
 .../main/scala/kafka/server/ReplicaManager.scala   | 121 ++++++++++--------
 .../unit/kafka/server/ReplicaManagerTest.scala     | 139 ++++++++++++++++++++-
 4 files changed, 218 insertions(+), 61 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 61d5f707dc..346d2ed184 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -700,13 +700,14 @@ class Partition(val topicPartition: TopicPartition,
         val leaderEpochEndOffset = followerLog.logEndOffset
         stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
           s"offset $leaderEpochEndOffset with partition epoch ${partitionState.partitionEpoch} and " +
-          s"high watermark ${followerLog.highWatermark}. Previous leader epoch was $leaderEpoch.")
+          s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionState.leader}. " +
+          s"Previous leader epoch was $leaderEpoch.")
       } else {
         stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " +
           s"and partition state $partitionState since it is already a follower with leader epoch $leaderEpoch.")
       }
 
-      leaderReplicaIdOpt = Some(partitionState.leader)
+      leaderReplicaIdOpt = Option(partitionState.leader)
       leaderEpoch = partitionState.leaderEpoch
       leaderEpochStartOffsetOpt = None
       partitionEpoch = partitionState.partitionEpoch
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 514b94bab1..8d731a8139 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -484,11 +484,9 @@ class BrokerServer(
       info("shutting down")
 
       if (config.controlledShutdownEnable) {
-        // Shut down the broker metadata listener, so that we don't get added to any
-        // more ISRs.
-        if (metadataListener !=  null) {
-          metadataListener.beginShutdown()
-        }
+        if (replicaManager != null)
+          replicaManager.beginControlledShutdown()
+
         lifecycleManager.beginControlledShutdown()
         try {
           lifecycleManager.controlledShutdownFuture.get(5L, TimeUnit.MINUTES)
@@ -499,6 +497,10 @@ class BrokerServer(
             error("Got unexpected exception waiting for controlled shutdown future", e)
         }
       }
+
+      if (metadataListener != null)
+        metadataListener.beginShutdown()
+
       lifecycleManager.beginShutdown()
 
       // Stop socket server to stop accepting any more connections and requests.
@@ -513,7 +515,7 @@ class BrokerServer(
       if (controlPlaneRequestProcessor != null)
         CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
       CoreUtils.swallow(authorizer.foreach(_.close()), this)
-      if (metadataListener !=  null) {
+      if (metadataListener != null) {
         CoreUtils.swallow(metadataListener.close(), this)
       }
       metadataSnapshotter.foreach(snapshotter => CoreUtils.swallow(snapshotter.close(), this))
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 190f80f36c..03983ad98d 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -21,7 +21,6 @@ import java.util.Optional
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.Lock
-
 import com.yammer.metrics.core.Meter
 import kafka.api._
 import kafka.cluster.{BrokerEndPoint, Partition}
@@ -60,6 +59,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
+import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.server.common.MetadataVersion._
 
 import scala.jdk.CollectionConverters._
@@ -231,6 +231,8 @@ class ReplicaManager(val config: KafkaConfig,
   @volatile private[server] var highWatermarkCheckpoints: Map[String, OffsetCheckpointFile] = logManager.liveLogDirs.map(dir =>
     (dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
 
+  @volatile private var isInControlledShutdown = false
+
   this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
   protected val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
 
@@ -1878,6 +1880,10 @@ class ReplicaManager(val config: KafkaConfig,
     removeMetric("PartitionsWithLateTransactionsCount")
   }
 
+  def beginControlledShutdown(): Unit = {
+    isInControlledShutdown = true
+  }
+
   // High watermark do not need to be checkpointed only when under unit tests
   def shutdown(checkpointHW: Boolean = true): Unit = {
     info("Shutting down")
@@ -2088,12 +2094,12 @@ class ReplicaManager(val config: KafkaConfig,
     changedPartitions: mutable.Set[Partition],
     delta: TopicsDelta,
     offsetCheckpoints: OffsetCheckpoints,
-    newLocalLeaders: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]
+    localLeaders: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]
   ): Unit = {
-    stateChangeLogger.info(s"Transitioning ${newLocalLeaders.size} partition(s) to " +
+    stateChangeLogger.info(s"Transitioning ${localLeaders.size} partition(s) to " +
       "local leaders.")
-    replicaFetcherManager.removeFetcherForPartitions(newLocalLeaders.keySet)
-    newLocalLeaders.forKeyValue { (tp, info) =>
+    replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet)
+    localLeaders.forKeyValue { (tp, info) =>
       getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
         try {
           val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
@@ -2118,36 +2124,39 @@ class ReplicaManager(val config: KafkaConfig,
     newImage: MetadataImage,
     delta: TopicsDelta,
     offsetCheckpoints: OffsetCheckpoints,
-    newLocalFollowers: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]
+    localFollowers: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]
   ): Unit = {
-    stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} partition(s) to " +
+    stateChangeLogger.info(s"Transitioning ${localFollowers.size} partition(s) to " +
       "local followers.")
     val shuttingDown = isShuttingDown.get()
-    val partitionsToMakeFollower = new mutable.HashMap[TopicPartition, Partition]
-    val newFollowerTopicSet = new mutable.HashSet[String]
-    newLocalFollowers.forKeyValue { (tp, info) =>
+    val partitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition]
+    val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean]
+    val followerTopicSet = new mutable.HashSet[String]
+    localFollowers.forKeyValue { (tp, info) =>
       getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
         try {
-          newFollowerTopicSet.add(tp.topic)
+          followerTopicSet.add(tp.topic)
 
           if (shuttingDown) {
             stateChangeLogger.trace(s"Unable to start fetching $tp with topic " +
               s"ID ${info.topicId} because the replica manager is shutting down.")
           } else {
-            val leader = info.partition.leader
-            if (newImage.cluster.broker(leader) == null) {
-              stateChangeLogger.trace(s"Unable to start fetching $tp with topic ID ${info.topicId} " +
-                s"from leader $leader because it is not alive.")
-
-              // Create the local replica even if the leader is unavailable. This is required
-              // to ensure that we include the partition's high watermark in the checkpoint
-              // file (see KAFKA-1647).
-              partition.createLogIfNotExists(isNew, false, offsetCheckpoints, Some(info.topicId))
-            } else {
-              val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-              if (partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))) {
-                partitionsToMakeFollower.put(tp, partition)
-              }
+            // We always update the follower state.
+            // - This ensure that a replica with no leader can step down;
+            // - This also ensures that the local replica is created even if the leader
+            //   is unavailable. This is required to ensure that we include the partition's
+            //   high watermark in the checkpoint file (see KAFKA-1647).
+            val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
+            val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))
+
+            if (isInControlledShutdown && (info.partition.leader == NO_LEADER ||
+                !info.partition.isr.contains(config.brokerId))) {
+              // During controlled shutdown, replica with no leaders and replica
+              // where this broker is not in the ISR are stopped.
+              partitionsToStopFetching.put(tp, false)
+            } else if (isNewLeaderEpoch) {
+              // Otherwise, fetcher is restarted if the leader epoch has changed.
+              partitionsToStartFetching.put(tp, partition)
             }
           }
           changedPartitions.add(partition)
@@ -2170,33 +2179,47 @@ class ReplicaManager(val config: KafkaConfig,
       }
     }
 
-    // Stopping the fetchers must be done first in order to initialize the fetch
-    // position correctly.
-    replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet)
-    stateChangeLogger.info(s"Stopped fetchers as part of become-follower for ${partitionsToMakeFollower.size} partitions")
-
-    val listenerName = config.interBrokerListenerName.value
-    val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState]
-    partitionsToMakeFollower.forKeyValue { (topicPartition, partition) =>
-      val node = partition.leaderReplicaIdOpt
-        .flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
-        .flatMap(_.node(listenerName).asScala)
-        .getOrElse(Node.noNode)
-      val log = partition.localLogOrException
-      partitionAndOffsets.put(topicPartition, InitialFetchState(
-        log.topicId,
-        new BrokerEndPoint(node.id, node.host, node.port),
-        partition.getLeaderEpoch,
-        initialFetchOffset(log)
-      ))
-    }
+    if (partitionsToStartFetching.nonEmpty) {
+      // Stopping the fetchers must be done first in order to initialize the fetch
+      // position correctly.
+      replicaFetcherManager.removeFetcherForPartitions(partitionsToStartFetching.keySet)
+      stateChangeLogger.info(s"Stopped fetchers as part of become-follower for ${partitionsToStartFetching.size} partitions")
+
+      val listenerName = config.interBrokerListenerName.value
+      val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState]
+
+      partitionsToStartFetching.forKeyValue { (topicPartition, partition) =>
+        val nodeOpt = partition.leaderReplicaIdOpt
+          .flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
+          .flatMap(_.node(listenerName).asScala)
+
+        nodeOpt match {
+          case Some(node) =>
+            val log = partition.localLogOrException
+            partitionAndOffsets.put(topicPartition, InitialFetchState(
+              log.topicId,
+              new BrokerEndPoint(node.id, node.host, node.port),
+              partition.getLeaderEpoch,
+              initialFetchOffset(log)
+            ))
+          case None =>
+            stateChangeLogger.trace(s"Unable to start fetching $topicPartition with topic ID ${partition.topicId} " +
+              s"from leader ${partition.leaderReplicaIdOpt} because it is not alive.")
+        }
+      }
 
-    replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
-    stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToMakeFollower.size} partitions")
+      replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
+      stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToStartFetching.size} partitions")
 
-    partitionsToMakeFollower.keySet.foreach(completeDelayedFetchOrProduceRequests)
+      partitionsToStartFetching.keySet.foreach(completeDelayedFetchOrProduceRequests)
 
-    updateLeaderAndFollowerMetrics(newFollowerTopicSet)
+      updateLeaderAndFollowerMetrics(followerTopicSet)
+    }
+
+    if (partitionsToStopFetching.nonEmpty) {
+      stopPartitions(partitionsToStopFetching)
+      stateChangeLogger.info(s"Stopped fetchers as part of controlled shutdown for ${partitionsToStopFetching.size} partitions")
+    }
   }
 
   def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 80e6112518..50dcfb96eb 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -53,6 +53,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, ProducerIdsImage, TopicsDelta, TopicsImage}
+import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch}
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
@@ -64,7 +65,7 @@ import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.{any, anyInt}
-import org.mockito.Mockito.{mock, reset, times, verify, when}
+import org.mockito.Mockito.{mock, never, reset, times, verify, when}
 
 import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
@@ -3826,8 +3827,8 @@ class ReplicaManagerTest {
       assertEquals(1, followerPartition.getPartitionEpoch)
 
       // Verify that partition's fetcher was not impacted.
-      verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set.empty)
-      verify(mockReplicaFetcherManager).addFetcherForPartitions(Map.empty)
+      verify(mockReplicaFetcherManager, never()).removeFetcherForPartitions(any())
+      verify(mockReplicaFetcherManager, never()).addFetcherForPartitions(any())
 
       reset(mockReplicaFetcherManager)
 
@@ -3840,7 +3841,7 @@ class ReplicaManagerTest {
         .setIsr(util.Arrays.asList(localId, localId + 1, localId + 2))
         .setLeader(localId + 2)
       )
-      println(followerTopicsDelta.changedTopic(FOO_UUID).partitionChanges())
+
       followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
@@ -3863,6 +3864,136 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testReplicasAreStoppedWhileInControlledShutdownWithKRaft(): Unit = {
+    val localId = 0
+    val foo0 = new TopicPartition("foo", 0)
+    val foo1 = new TopicPartition("foo", 1)
+    val foo2 = new TopicPartition("foo", 2)
+
+    val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(
+      timer = new MockTimer(time),
+      brokerId = localId,
+      mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
+    )
+
+    try {
+      when(mockReplicaFetcherManager.removeFetcherForPartitions(
+        Set(foo0, foo1))
+      ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
+
+      var topicsDelta = new TopicsDelta(TopicsImage.EMPTY)
+      topicsDelta.replay(new TopicRecord()
+        .setName("foo")
+        .setTopicId(FOO_UUID)
+      )
+
+      // foo0 is a follower in the ISR.
+      topicsDelta.replay(new PartitionRecord()
+        .setPartitionId(0)
+        .setTopicId(FOO_UUID)
+        .setReplicas(util.Arrays.asList(localId, localId + 1))
+        .setIsr(util.Arrays.asList(localId, localId + 1))
+        .setLeader(localId + 1)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(0)
+      )
+
+      // foo1 is a leader with only himself in the ISR.
+      topicsDelta.replay(new PartitionRecord()
+        .setPartitionId(1)
+        .setTopicId(FOO_UUID)
+        .setReplicas(util.Arrays.asList(localId, localId + 1))
+        .setIsr(util.Arrays.asList(localId))
+        .setLeader(localId)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(0)
+      )
+
+      // foo2 is a follower NOT in the ISR.
+      topicsDelta.replay(new PartitionRecord()
+        .setPartitionId(2)
+        .setTopicId(FOO_UUID)
+        .setReplicas(util.Arrays.asList(localId, localId + 1))
+        .setIsr(util.Arrays.asList(localId + 1))
+        .setLeader(localId + 1)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(0)
+      )
+
+      // Apply the delta.
+      var metadataImage = imageFromTopics(topicsDelta.apply())
+      replicaManager.applyDelta(topicsDelta, metadataImage)
+
+      // Check the state of the partitions.
+      val HostedPartition.Online(fooPartition0) = replicaManager.getPartition(foo0)
+      assertFalse(fooPartition0.isLeader)
+      assertEquals(0, fooPartition0.getLeaderEpoch)
+      assertEquals(0, fooPartition0.getPartitionEpoch)
+
+      val HostedPartition.Online(fooPartition1) = replicaManager.getPartition(foo1)
+      assertTrue(fooPartition1.isLeader)
+      assertEquals(0, fooPartition1.getLeaderEpoch)
+      assertEquals(0, fooPartition1.getPartitionEpoch)
+
+      val HostedPartition.Online(fooPartition2) = replicaManager.getPartition(foo2)
+      assertFalse(fooPartition2.isLeader)
+      assertEquals(0, fooPartition2.getLeaderEpoch)
+      assertEquals(0, fooPartition2.getPartitionEpoch)
+
+      reset(mockReplicaFetcherManager)
+
+      // The replica begins the controlled shutdown.
+      replicaManager.beginControlledShutdown()
+
+      // When the controller receives the controlled shutdown
+      // request, it does the following:
+      // - Shrinks the ISR of foo0 to remove this replica.
+      // - Sets the leader of foo1 to NO_LEADER because it cannot elect another leader.
+      // - Does nothing for foo2 because this replica is not in the ISR.
+      topicsDelta = new TopicsDelta(metadataImage.topics())
+      topicsDelta.replay(new PartitionChangeRecord()
+        .setPartitionId(0)
+        .setTopicId(FOO_UUID)
+        .setReplicas(util.Arrays.asList(localId, localId + 1))
+        .setIsr(util.Arrays.asList(localId + 1))
+        .setLeader(localId + 1)
+      )
+      topicsDelta.replay(new PartitionChangeRecord()
+        .setPartitionId(1)
+        .setTopicId(FOO_UUID)
+        .setReplicas(util.Arrays.asList(localId, localId + 1))
+        .setIsr(util.Arrays.asList(localId))
+        .setLeader(NO_LEADER)
+      )
+      metadataImage = imageFromTopics(topicsDelta.apply())
+      replicaManager.applyDelta(topicsDelta, metadataImage)
+
+      // Partition foo0 and foo1 are updated.
+      assertFalse(fooPartition0.isLeader)
+      assertEquals(1, fooPartition0.getLeaderEpoch)
+      assertEquals(1, fooPartition0.getPartitionEpoch)
+      assertFalse(fooPartition1.isLeader)
+      assertEquals(1, fooPartition1.getLeaderEpoch)
+      assertEquals(1, fooPartition1.getPartitionEpoch)
+
+      // Partition foo2 is not.
+      assertFalse(fooPartition2.isLeader)
+      assertEquals(0, fooPartition2.getLeaderEpoch)
+      assertEquals(0, fooPartition2.getPartitionEpoch)
+
+      // Fetcher for foo0 and foo1 are stopped.
+      verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(foo0, foo1))
+    } finally {
+      // Fetcher for foo2 is stopped when the replica manager shuts down
+      // because this replica was not in the ISR.
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
   private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean): TopicsDelta = {
     val leader = if (isStartIdLeader) startId else startId + 1
     val delta = new TopicsDelta(TopicsImage.EMPTY)