You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/05/21 15:19:55 UTC
[kafka] branch 2.3 updated: KAFKA-3143: Controller should
transition offline replicas on startup
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 423610f KAFKA-3143: Controller should transition offline replicas on startup
423610f is described below
commit 423610f3d08275bd529b75d372ae7c49d2b03c42
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Tue May 21 20:48:49 2019 +0530
KAFKA-3143: Controller should transition offline replicas on startup
Author: Manikumar Reddy <ma...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>, Jason Gustafson <ja...@confluent.io>
Closes #5041 from omkreddy/KAFKA-3143
(cherry picked from commit d77bac1c930afc7b7247f9a1a66b666801ae7798)
Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
.../scala/kafka/controller/ControllerContext.scala | 22 +++++++++
.../kafka/controller/ReplicaStateMachine.scala | 8 +++-
.../controller/ControllerIntegrationTest.scala | 52 +++++++++++++++++++++
.../kafka/controller/ReplicaStateMachineTest.scala | 43 +++++++++++++++++
.../integration/UncleanLeaderElectionTest.scala | 4 +-
.../unit/kafka/server/LeaderElectionTest.scala | 54 +++++++++++-----------
6 files changed, 151 insertions(+), 32 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 573a981..e41c899 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -175,6 +175,28 @@ class ControllerContext {
}
}
+ /**
+ * Get all online and offline replicas.
+ *
+ * @return a tuple consisting of first the online replicas and followed by the offline replicas
+ */
+ def onlineAndOfflineReplicas: (Set[PartitionAndReplica], Set[PartitionAndReplica]) = {
+ val onlineReplicas = mutable.Set.empty[PartitionAndReplica]
+ val offlineReplicas = mutable.Set.empty[PartitionAndReplica]
+ for ((topic, partitionReplicas) <- partitionAssignments;
+ (partitionId, replicas) <- partitionReplicas) {
+ val partition = new TopicPartition(topic, partitionId)
+ for (replica <- replicas) {
+ val partitionAndReplica = PartitionAndReplica(partition, replica)
+ if (isReplicaOnline(replica, partition))
+ onlineReplicas.add(partitionAndReplica)
+ else
+ offlineReplicas.add(partitionAndReplica)
+ }
+ }
+ (onlineReplicas, offlineReplicas)
+ }
+
def replicasForPartition(partitions: collection.Set[TopicPartition]): collection.Set[PartitionAndReplica] = {
partitions.flatMap { p =>
val replicas = partitionReplicaAssignment(p)
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index ae2f629..d10d9b6 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -36,7 +36,10 @@ abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends
info("Initializing replica state")
initializeReplicaState()
info("Triggering online replica state changes")
- handleStateChanges(controllerContext.allLiveReplicas().toSeq, OnlineReplica)
+ val (onlineReplicas, offlineReplicas) = controllerContext.onlineAndOfflineReplicas
+ handleStateChanges(onlineReplicas.toSeq, OnlineReplica)
+ info("Triggering offline replica state changes")
+ handleStateChanges(offlineReplicas.toSeq, OfflineReplica)
debug(s"Started replica state machine with initial state -> ${controllerContext.replicaStates}")
}
@@ -149,7 +152,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
* -- remove the replica from the in memory partition replica assignment cache
*
* @param replicaId The replica for which the state transition is invoked
- * @param partitions The partitions on this replica for which the state transition is invoked
+ * @param replicas The partitions on this replica for which the state transition is invoked
* @param targetState The end state that the replica should be moved to
*/
private def doHandleStateChanges(replicaId: Int, replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
@@ -231,6 +234,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
replicasWithoutLeadershipInfo.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
logSuccessfulTransition(replicaId, replica.topicPartition, currentState, OfflineReplica)
+ controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(replica.topicPartition))
controllerContext.putReplicaState(replica, OfflineReplica)
}
case ReplicaDeletionStarted =>
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 4ecabf5..d57d78e 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -165,6 +165,58 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
}
@Test
+ def testMetadataPropagationForOfflineReplicas(): Unit = {
+ servers = makeServers(3)
+ TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+
+ //get brokerId for topic creation with single partition and RF =1
+ val replicaBroker = servers.filter(e => e.config.brokerId != controllerId).head
+
+ val controllerBroker = servers.filter(e => e.config.brokerId == controllerId).head
+ val otherBroker = servers.filter(e => e.config.brokerId != controllerId &&
+ e.config.brokerId != replicaBroker.config.brokerId).head
+
+ val topic = "topic1"
+ val assignment = Map(0 -> Seq(replicaBroker.config.brokerId))
+
+ // Create topic
+ TestUtils.createTopic(zkClient, topic, assignment, servers)
+
+ // Shutdown the other broker
+ otherBroker.shutdown()
+ otherBroker.awaitShutdown()
+
+ // Shutdown the broker with replica
+ replicaBroker.shutdown()
+ replicaBroker.awaitShutdown()
+
+ //Shutdown controller broker
+ controllerBroker.shutdown()
+ controllerBroker.awaitShutdown()
+
+ def verifyMetadata(broker: KafkaServer): Unit = {
+ broker.startup()
+ TestUtils.waitUntilTrue(() => {
+ val partitionInfoOpt = broker.metadataCache.getPartitionInfo(topic, 0)
+ if (partitionInfoOpt.isDefined) {
+ val partitionInfo = partitionInfoOpt.get
+ (!partitionInfo.offlineReplicas.isEmpty && partitionInfo.basePartitionState.leader == -1
+ && !partitionInfo.basePartitionState.replicas.isEmpty && !partitionInfo.basePartitionState.isr.isEmpty)
+ } else {
+ false
+ }
+ }, "Inconsistent metadata after broker startup")
+ }
+
+ //Start controller broker and check metadata
+ verifyMetadata(controllerBroker)
+
+ //Start other broker and check metadata
+ verifyMetadata(otherBroker)
+ }
+
+ @Test
def testTopicCreation(): Unit = {
servers = makeServers(1)
val tp = new TopicPartition("t", 0)
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 2a67e74..2420333 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -17,12 +17,15 @@
package kafka.controller
import kafka.api.LeaderAndIsr
+import kafka.cluster.{Broker, EndPoint}
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
import kafka.zookeeper.{GetDataResponse, ResponseMetadata}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.Stat
import org.easymock.EasyMock
@@ -58,6 +61,40 @@ class ReplicaStateMachineTest {
}
@Test
+ def testStartupOnlinePartition(): Unit = {
+ val endpoint1 = new EndPoint("localhost", 9997, new ListenerName("blah"),
+ SecurityProtocol.PLAINTEXT)
+ val liveBrokerEpochs = Map(Broker(brokerId, Seq(endpoint1), rack = None) -> 1L)
+ controllerContext.setLiveBrokerAndEpochs(liveBrokerEpochs)
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
+ assertEquals(None, controllerContext.replicaStates.get(replica))
+ replicaStateMachine.startup()
+ assertEquals(OnlineReplica, replicaState(replica))
+ }
+
+ @Test
+ def testStartupOfflinePartition(): Unit = {
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
+ assertEquals(None, controllerContext.replicaStates.get(replica))
+ replicaStateMachine.startup()
+ assertEquals(OfflineReplica, replicaState(replica))
+ }
+
+ @Test
+ def testStartupWithReplicaWithoutLeader(): Unit = {
+ val shutdownBrokerId = 100
+ val offlineReplica = PartitionAndReplica(partition, shutdownBrokerId)
+ val endpoint1 = new EndPoint("localhost", 9997, new ListenerName("blah"),
+ SecurityProtocol.PLAINTEXT)
+ val liveBrokerEpochs = Map(Broker(brokerId, Seq(endpoint1), rack = None) -> 1L)
+ controllerContext.setLiveBrokerAndEpochs(liveBrokerEpochs)
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(shutdownBrokerId))
+ assertEquals(None, controllerContext.replicaStates.get(offlineReplica))
+ replicaStateMachine.startup()
+ assertEquals(OfflineReplica, replicaState(offlineReplica))
+ }
+
+ @Test
def testNonexistentReplicaToNewReplicaTransition(): Unit = {
replicaStateMachine.handleStateChanges(replicas, NewReplica)
assertEquals(NewReplica, replicaState(replica))
@@ -108,10 +145,16 @@ class ReplicaStateMachineTest {
@Test
def testNewReplicaToOfflineReplicaTransition(): Unit = {
+ val endpoint1 = new EndPoint("localhost", 9997, new ListenerName("blah"),
+ SecurityProtocol.PLAINTEXT)
+ val liveBrokerEpochs = Map(Broker(brokerId, Seq(endpoint1), rack = None) -> 1L)
+ controllerContext.setLiveBrokerAndEpochs(liveBrokerEpochs)
controllerContext.putReplicaState(replica, NewReplica)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(EasyMock.eq(Seq(brokerId)), EasyMock.eq(partition), EasyMock.eq(false)))
+ EasyMock.expect(mockControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(EasyMock.eq(Seq(brokerId)), EasyMock.eq(Set(partition))))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+
EasyMock.replay(mockControllerBrokerRequestBatch)
replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
EasyMock.verify(mockControllerBrokerRequestBatch)
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index a6d856d..2974761 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -113,7 +113,6 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
}
@Test
- @Ignore // Should be re-enabled after KAFKA-3096 is fixed
def testUncleanLeaderElectionDisabled(): Unit = {
// unclean leader election is disabled by default
startBrokers(Seq(configProps1, configProps2))
@@ -140,8 +139,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
}
@Test
- @Ignore // Should be re-enabled after KAFKA-3096 is fixed
- def testCleanLeaderElectionDisabledByTopicOverride(): Unit = {
+ def testUncleanLeaderElectionDisabledByTopicOverride(): Unit = {
// enable unclean leader election globally, but disable for our specific test topic
configProps1.put("unclean.leader.election.enable", "true")
configProps2.put("unclean.leader.election.enable", "true")
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 280a5dc..c7ceb20 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -71,43 +71,43 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
val topic = "new-topic"
val partitionId = 0
+ TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+
// create topic with 1 partition, 2 replicas, one on each broker
val leader1 = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
val leaderEpoch1 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
- debug("leader Epoch: " + leaderEpoch1)
- debug("Leader is elected to be: %s".format(leader1))
- // NOTE: this is to avoid transient test failures
- assertTrue("Leader could be broker 0 or broker 1", leader1 == 0 || leader1 == 1)
+ assertTrue("Leader should be broker 0", leader1 == 0)
assertEquals("First epoch value should be 0", 0, leaderEpoch1)
- // kill the server hosting the preferred replica
- servers.last.shutdown()
+ // kill the server hosting the preferred replica/initial leader
+ servers.head.shutdown()
// check if leader moves to the other server
- val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId,
- oldLeaderOpt = if (leader1 == 0) None else Some(leader1))
+ val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader1))
val leaderEpoch2 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
- debug("Leader is elected to be: %s".format(leader1))
- debug("leader Epoch: " + leaderEpoch2)
- assertEquals("Leader must move to broker 0", 0, leader2)
- if (leader1 == leader2)
- assertEquals("Second epoch value should be " + leaderEpoch1+1, leaderEpoch1+1, leaderEpoch2)
- else
- assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1, leaderEpoch2)
-
- servers.last.startup()
- servers.head.shutdown()
+ assertEquals("Leader must move to broker 1", 1, leader2)
+ // new leaderEpoch will be leaderEpoch1+2, one increment during ReplicaStateMachine.startup()-> handleStateChanges
+ // for offline replica and one increment during PartitionStateMachine.triggerOnlinePartitionStateChange()
+ assertEquals("Second epoch value should be %d".format(leaderEpoch1 + 2) , leaderEpoch1 + 2, leaderEpoch2)
+
+ servers.head.startup()
+ //make sure second server joins the ISR
+ TestUtils.waitUntilTrue(() => {
+ val partitionInfoOpt = servers.last.metadataCache.getPartitionInfo(topic, partitionId)
+ if (partitionInfoOpt.isDefined) {
+ partitionInfoOpt.get.basePartitionState.isr.size() == 2
+ } else {
+ false
+ }
+ }, "Inconsistent metadata after second broker startup")
+
+ servers.last.shutdown()
+
Thread.sleep(zookeeper.tickTime)
- val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId,
- oldLeaderOpt = if (leader2 == 1) None else Some(leader2))
+ val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader2))
val leaderEpoch3 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
- debug("leader Epoch: " + leaderEpoch3)
- debug("Leader is elected to be: %s".format(leader3))
- assertEquals("Leader must return to 1", 1, leader3)
- if (leader2 == leader3)
- assertEquals("Second epoch value should be " + leaderEpoch2, leaderEpoch2, leaderEpoch3)
- else
- assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
+ assertEquals("Leader must return to 0", 0, leader3)
+ assertEquals("Second epoch value should be %d".format(leaderEpoch2 + 2) , leaderEpoch2 + 2, leaderEpoch3)
}
@Test