You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/05/21 15:19:30 UTC

[kafka] branch trunk updated: KAFKA-3143: Controller should transition offline replicas on startup

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d77bac1  KAFKA-3143: Controller should transition offline replicas on startup
d77bac1 is described below

commit d77bac1c930afc7b7247f9a1a66b666801ae7798
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Tue May 21 20:48:49 2019 +0530

    KAFKA-3143: Controller should transition offline replicas on startup
    
    Author: Manikumar Reddy <ma...@gmail.com>
    
    Reviewers: Jun Rao <ju...@gmail.com>, Jason Gustafson <ja...@confluent.io>
    
    Closes #5041 from omkreddy/KAFKA-3143
---
 .../scala/kafka/controller/ControllerContext.scala | 22 +++++++++
 .../kafka/controller/ReplicaStateMachine.scala     |  8 +++-
 .../controller/ControllerIntegrationTest.scala     | 52 +++++++++++++++++++++
 .../kafka/controller/ReplicaStateMachineTest.scala | 43 +++++++++++++++++
 .../integration/UncleanLeaderElectionTest.scala    |  4 +-
 .../unit/kafka/server/LeaderElectionTest.scala     | 54 +++++++++++-----------
 6 files changed, 151 insertions(+), 32 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 573a981..e41c899 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -175,6 +175,28 @@ class ControllerContext {
     }
   }
 
+  /**
+    * Get all online and offline replicas.
+    *
+    * @return a tuple consisting of first the online replicas and followed by the offline replicas
+    */
+  def onlineAndOfflineReplicas: (Set[PartitionAndReplica], Set[PartitionAndReplica]) = {
+    val onlineReplicas = mutable.Set.empty[PartitionAndReplica]
+    val offlineReplicas = mutable.Set.empty[PartitionAndReplica]
+    for ((topic, partitionReplicas) <- partitionAssignments;
+         (partitionId, replicas) <- partitionReplicas) {
+      val partition = new TopicPartition(topic, partitionId)
+      for (replica <- replicas) {
+        val partitionAndReplica = PartitionAndReplica(partition, replica)
+        if (isReplicaOnline(replica, partition))
+          onlineReplicas.add(partitionAndReplica)
+        else
+          offlineReplicas.add(partitionAndReplica)
+      }
+    }
+    (onlineReplicas, offlineReplicas)
+  }
+
   def replicasForPartition(partitions: collection.Set[TopicPartition]): collection.Set[PartitionAndReplica] = {
     partitions.flatMap { p =>
       val replicas = partitionReplicaAssignment(p)
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index ae2f629..d10d9b6 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -36,7 +36,10 @@ abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends
     info("Initializing replica state")
     initializeReplicaState()
     info("Triggering online replica state changes")
-    handleStateChanges(controllerContext.allLiveReplicas().toSeq, OnlineReplica)
+    val (onlineReplicas, offlineReplicas) = controllerContext.onlineAndOfflineReplicas
+    handleStateChanges(onlineReplicas.toSeq, OnlineReplica)
+    info("Triggering offline replica state changes")
+    handleStateChanges(offlineReplicas.toSeq, OfflineReplica)
     debug(s"Started replica state machine with initial state -> ${controllerContext.replicaStates}")
   }
 
@@ -149,7 +152,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
    * -- remove the replica from the in memory partition replica assignment cache
    *
    * @param replicaId The replica for which the state transition is invoked
-   * @param partitions The partitions on this replica for which the state transition is invoked
+   * @param replicas The partitions on this replica for which the state transition is invoked
    * @param targetState The end state that the replica should be moved to
    */
   private def doHandleStateChanges(replicaId: Int, replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
@@ -231,6 +234,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
         replicasWithoutLeadershipInfo.foreach { replica =>
           val currentState = controllerContext.replicaState(replica)
           logSuccessfulTransition(replicaId, replica.topicPartition, currentState, OfflineReplica)
+          controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(replica.topicPartition))
           controllerContext.putReplicaState(replica, OfflineReplica)
         }
       case ReplicaDeletionStarted =>
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 4ecabf5..d57d78e 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -165,6 +165,58 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testMetadataPropagationForOfflineReplicas(): Unit = {
+    servers = makeServers(3)
+    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+
+    //get brokerId for topic creation with single partition and RF =1
+    val replicaBroker = servers.filter(e => e.config.brokerId != controllerId).head
+
+    val controllerBroker = servers.filter(e => e.config.brokerId == controllerId).head
+    val otherBroker = servers.filter(e => e.config.brokerId != controllerId &&
+      e.config.brokerId != replicaBroker.config.brokerId).head
+
+    val topic = "topic1"
+    val assignment = Map(0 -> Seq(replicaBroker.config.brokerId))
+
+    // Create topic
+    TestUtils.createTopic(zkClient, topic, assignment, servers)
+
+    // Shutdown the other broker
+    otherBroker.shutdown()
+    otherBroker.awaitShutdown()
+
+    // Shutdown the broker with replica
+    replicaBroker.shutdown()
+    replicaBroker.awaitShutdown()
+
+    //Shutdown controller broker
+    controllerBroker.shutdown()
+    controllerBroker.awaitShutdown()
+
+    def verifyMetadata(broker: KafkaServer): Unit = {
+      broker.startup()
+      TestUtils.waitUntilTrue(() => {
+        val partitionInfoOpt = broker.metadataCache.getPartitionInfo(topic, 0)
+        if (partitionInfoOpt.isDefined) {
+          val partitionInfo = partitionInfoOpt.get
+          (!partitionInfo.offlineReplicas.isEmpty && partitionInfo.basePartitionState.leader == -1
+            && !partitionInfo.basePartitionState.replicas.isEmpty && !partitionInfo.basePartitionState.isr.isEmpty)
+        } else {
+          false
+        }
+      }, "Inconsistent metadata after broker startup")
+    }
+
+    //Start controller broker and check metadata
+    verifyMetadata(controllerBroker)
+
+    //Start other broker and check metadata
+    verifyMetadata(otherBroker)
+  }
+
+  @Test
   def testTopicCreation(): Unit = {
     servers = makeServers(1)
     val tp = new TopicPartition("t", 0)
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 2a67e74..2420333 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -17,12 +17,15 @@
 package kafka.controller
 
 import kafka.api.LeaderAndIsr
+import kafka.cluster.{Broker, EndPoint}
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zookeeper.{GetDataResponse, ResponseMetadata}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.Stat
 import org.easymock.EasyMock
@@ -58,6 +61,40 @@ class ReplicaStateMachineTest {
   }
 
   @Test
+  def testStartupOnlinePartition(): Unit = {
+    val endpoint1 = new EndPoint("localhost", 9997, new ListenerName("blah"),
+      SecurityProtocol.PLAINTEXT)
+    val liveBrokerEpochs = Map(Broker(brokerId, Seq(endpoint1), rack = None) -> 1L)
+    controllerContext.setLiveBrokerAndEpochs(liveBrokerEpochs)
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
+    assertEquals(None, controllerContext.replicaStates.get(replica))
+    replicaStateMachine.startup()
+    assertEquals(OnlineReplica, replicaState(replica))
+  }
+
+  @Test
+  def testStartupOfflinePartition(): Unit = {
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
+    assertEquals(None, controllerContext.replicaStates.get(replica))
+    replicaStateMachine.startup()
+    assertEquals(OfflineReplica, replicaState(replica))
+  }
+
+  @Test
+  def testStartupWithReplicaWithoutLeader(): Unit = {
+    val shutdownBrokerId = 100
+    val offlineReplica = PartitionAndReplica(partition, shutdownBrokerId)
+    val endpoint1 = new EndPoint("localhost", 9997, new ListenerName("blah"),
+      SecurityProtocol.PLAINTEXT)
+    val liveBrokerEpochs = Map(Broker(brokerId, Seq(endpoint1), rack = None) -> 1L)
+    controllerContext.setLiveBrokerAndEpochs(liveBrokerEpochs)
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(shutdownBrokerId))
+    assertEquals(None, controllerContext.replicaStates.get(offlineReplica))
+    replicaStateMachine.startup()
+    assertEquals(OfflineReplica, replicaState(offlineReplica))
+  }
+
+  @Test
   def testNonexistentReplicaToNewReplicaTransition(): Unit = {
     replicaStateMachine.handleStateChanges(replicas, NewReplica)
     assertEquals(NewReplica, replicaState(replica))
@@ -108,10 +145,16 @@ class ReplicaStateMachineTest {
 
   @Test
   def testNewReplicaToOfflineReplicaTransition(): Unit = {
+    val endpoint1 = new EndPoint("localhost", 9997, new ListenerName("blah"),
+      SecurityProtocol.PLAINTEXT)
+    val liveBrokerEpochs = Map(Broker(brokerId, Seq(endpoint1), rack = None) -> 1L)
+    controllerContext.setLiveBrokerAndEpochs(liveBrokerEpochs)
     controllerContext.putReplicaState(replica, NewReplica)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(EasyMock.eq(Seq(brokerId)), EasyMock.eq(partition), EasyMock.eq(false)))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(EasyMock.eq(Seq(brokerId)),  EasyMock.eq(Set(partition))))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+
     EasyMock.replay(mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
     EasyMock.verify(mockControllerBrokerRequestBatch)
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index a6d856d..2974761 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -113,7 +113,6 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  @Ignore // Should be re-enabled after KAFKA-3096 is fixed
   def testUncleanLeaderElectionDisabled(): Unit = {
     // unclean leader election is disabled by default
     startBrokers(Seq(configProps1, configProps2))
@@ -140,8 +139,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  @Ignore // Should be re-enabled after KAFKA-3096 is fixed
-  def testCleanLeaderElectionDisabledByTopicOverride(): Unit = {
+  def testUncleanLeaderElectionDisabledByTopicOverride(): Unit = {
     // enable unclean leader election globally, but disable for our specific test topic
     configProps1.put("unclean.leader.election.enable", "true")
     configProps2.put("unclean.leader.election.enable", "true")
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 280a5dc..c7ceb20 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -71,43 +71,43 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val topic = "new-topic"
     val partitionId = 0
 
+    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+
     // create topic with 1 partition, 2 replicas, one on each broker
     val leader1 = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
 
     val leaderEpoch1 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
-    debug("leader Epoch: " + leaderEpoch1)
-    debug("Leader is elected to be: %s".format(leader1))
-    // NOTE: this is to avoid transient test failures
-    assertTrue("Leader could be broker 0 or broker 1", leader1 == 0 || leader1 == 1)
+    assertTrue("Leader should be broker 0", leader1 == 0)
     assertEquals("First epoch value should be 0", 0, leaderEpoch1)
 
-    // kill the server hosting the preferred replica
-    servers.last.shutdown()
+    // kill the server hosting the preferred replica/initial leader
+    servers.head.shutdown()
     // check if leader moves to the other server
-    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId,
-                                                    oldLeaderOpt = if (leader1 == 0) None else Some(leader1))
+    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader1))
     val leaderEpoch2 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
-    debug("Leader is elected to be: %s".format(leader1))
-    debug("leader Epoch: " + leaderEpoch2)
-    assertEquals("Leader must move to broker 0", 0, leader2)
-    if (leader1 == leader2)
-      assertEquals("Second epoch value should be " + leaderEpoch1+1, leaderEpoch1+1, leaderEpoch2)
-    else
-      assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1, leaderEpoch2)
-
-    servers.last.startup()
-    servers.head.shutdown()
+    assertEquals("Leader must move to broker 1", 1, leader2)
+    // new leaderEpoch will be leaderEpoch1+2, one increment during ReplicaStateMachine.startup()-> handleStateChanges
+    // for offline replica and one increment during PartitionStateMachine.triggerOnlinePartitionStateChange()
+    assertEquals("Second epoch value should be %d".format(leaderEpoch1 + 2) , leaderEpoch1 + 2, leaderEpoch2)
+
+    servers.head.startup()
+    //make sure second server joins the ISR
+    TestUtils.waitUntilTrue(() => {
+      val partitionInfoOpt = servers.last.metadataCache.getPartitionInfo(topic, partitionId)
+      if (partitionInfoOpt.isDefined) {
+        partitionInfoOpt.get.basePartitionState.isr.size() == 2
+      } else {
+        false
+      }
+    }, "Inconsistent metadata after second broker startup")
+
+    servers.last.shutdown()
+
     Thread.sleep(zookeeper.tickTime)
-    val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId,
-                                                    oldLeaderOpt = if (leader2 == 1) None else Some(leader2))
+    val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader2))
     val leaderEpoch3 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
-    debug("leader Epoch: " + leaderEpoch3)
-    debug("Leader is elected to be: %s".format(leader3))
-    assertEquals("Leader must return to 1", 1, leader3)
-    if (leader2 == leader3)
-      assertEquals("Second epoch value should be " + leaderEpoch2, leaderEpoch2, leaderEpoch3)
-    else
-      assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
+    assertEquals("Leader must return to 0", 0, leader3)
+    assertEquals("Second epoch value should be %d".format(leaderEpoch2 + 2) , leaderEpoch2 + 2, leaderEpoch3)
   }
 
   @Test