You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/03 20:17:00 UTC
[kafka] branch trunk updated: KAFKA-8383;
Integration tests for unclean `electLeaders` (#6857)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 45fae33 KAFKA-8383; Integration tests for unclean `electLeaders` (#6857)
45fae33 is described below
commit 45fae339373fe4bd90fb320695267b8340094fa3
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Mon Jun 3 13:16:38 2019 -0700
KAFKA-8383; Integration tests for unclean `electLeaders` (#6857)
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/api/AdminClientIntegrationTest.scala | 346 +++++++++++++++++----
.../integration/UncleanLeaderElectionTest.scala | 2 +-
.../unit/kafka/server/LogDirFailureTest.scala | 1 -
.../scala/unit/kafka/server/LogRecoveryTest.scala | 1 -
4 files changed, 294 insertions(+), 56 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 66689e4..4145137 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -16,22 +16,22 @@
*/
package kafka.api
-import java.{time, util}
-import java.util.{Collections, Properties}
-import java.util.Arrays.asList
-import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.io.File
+import java.lang.{Long => JLong}
+import java.time.{Duration => JDuration}
+import java.util.Arrays.asList
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
-
-import org.apache.kafka.clients.admin.KafkaAdminClientTest
-import org.apache.kafka.common.utils.{Time, Utils}
+import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
+import java.util.{Collections, Properties}
+import java.{time, util}
import kafka.log.LogConfig
+import kafka.security.auth.{Cluster, Group, Topic}
import kafka.server.{Defaults, KafkaConfig, KafkaServer}
-import org.apache.kafka.clients.admin._
-import kafka.utils.{Logging, TestUtils}
-import kafka.utils.TestUtils._
import kafka.utils.Implicits._
-import org.apache.kafka.clients.admin.NewTopic
+import kafka.utils.TestUtils._
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
@@ -42,23 +42,17 @@ import org.apache.kafka.common.TopicPartitionReplica
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
-import org.junit.{After, Before, Rule, Test}
import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
-import org.junit.rules.Timeout
+import org.apache.kafka.common.utils.{Time, Utils}
import org.junit.Assert._
+import org.junit.rules.Timeout
+import org.junit.{After, Before, Rule, Test}
import org.scalatest.Assertions.intercept
-
-import scala.util.Random
import scala.collection.JavaConverters._
-import kafka.zk.KafkaZkClient
-
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
-import java.lang.{Long => JLong}
-import java.time.{Duration => JDuration}
-
-import kafka.security.auth.{Cluster, Group, Topic}
+import scala.util.Random
/**
* An integration test of the KafkaAdminClient.
@@ -1271,22 +1265,15 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0)
TestUtils.createTopic(zkClient, partition2.topic, Map[Int, Seq[Int]](partition2.partition -> prefer0), servers)
- def currentLeader(topicPartition: TopicPartition) =
- client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic).
- get.partitions.get(topicPartition.partition).leader.id
-
def preferredLeader(topicPartition: TopicPartition) =
client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic).
get.partitions.get(topicPartition.partition).replicas.get(0).id
- def waitForLeaderToBecome(topicPartition: TopicPartition, leader: Int) =
- TestUtils.waitUntilTrue(() => currentLeader(topicPartition) == leader, s"Expected leader to become $leader", 10000)
-
/** Changes the <i>preferred</i> leader without changing the <i>current</i> leader. */
def changePreferredLeader(newAssignment: Seq[Int]) = {
val preferred = newAssignment.head
- val prior1 = currentLeader(partition1)
- val prior2 = currentLeader(partition2)
+ val prior1 = currentLeader(client, partition1).get
+ val prior2 = currentLeader(client, partition2).get
var m = Map.empty[TopicPartition, Seq[Int]]
@@ -1301,26 +1288,26 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)} and ${preferredLeader(partition2)}",
10000)
// Check the leader hasn't moved
- assertEquals(prior1, currentLeader(partition1))
- assertEquals(prior2, currentLeader(partition2))
+ assertEquals(Some(prior1), currentLeader(client, partition1))
+ assertEquals(Some(prior2), currentLeader(client, partition2))
}
// Check current leaders are 0
- assertEquals(0, currentLeader(partition1))
- assertEquals(0, currentLeader(partition2))
+ assertEquals(Some(0), currentLeader(client, partition1))
+ assertEquals(Some(0), currentLeader(client, partition2))
// Noop election
var electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
var exception = electResult.partitions.get.get(partition1).get
assertEquals(classOf[ElectionNotNeededException], exception.getClass)
assertEquals("Leader election not needed for topic partition", exception.getMessage)
- assertEquals(0, currentLeader(partition1))
+ assertEquals(Some(0), currentLeader(client, partition1))
// Noop election with null partitions
electResult = client.electLeaders(ElectionType.PREFERRED, null)
assertTrue(electResult.partitions.get.isEmpty)
- assertEquals(0, currentLeader(partition1))
- assertEquals(0, currentLeader(partition2))
+ assertEquals(Some(0), currentLeader(client, partition1))
+ assertEquals(Some(0), currentLeader(client, partition2))
// Now change the preferred leader to 1
changePreferredLeader(prefer1)
@@ -1329,17 +1316,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet)
assertFalse(electResult.partitions.get.get(partition1).isPresent)
- waitForLeaderToBecome(partition1, 1)
+ waitForLeaderToBecome(client, partition1, Some(1))
// topic 2 unchanged
assertFalse(electResult.partitions.get.containsKey(partition2))
- assertEquals(0, currentLeader(partition2))
+ assertEquals(Some(0), currentLeader(client, partition2))
// meaningful election with null partitions
electResult = client.electLeaders(ElectionType.PREFERRED, null)
assertEquals(Set(partition2), electResult.partitions.get.keySet.asScala)
assertFalse(electResult.partitions.get.get(partition2).isPresent)
- waitForLeaderToBecome(partition2, 1)
+ waitForLeaderToBecome(client, partition2, Some(1))
// unknown topic
val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
@@ -1348,8 +1335,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
exception = electResult.partitions.get.get(unknownPartition).get
assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
assertEquals("The partition does not exist.", exception.getMessage)
- assertEquals(1, currentLeader(partition1))
- assertEquals(1, currentLeader(partition2))
+ assertEquals(Some(1), currentLeader(client, partition1))
+ assertEquals(Some(1), currentLeader(client, partition2))
// Now change the preferred leader to 2
changePreferredLeader(prefer2)
@@ -1357,8 +1344,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
// mixed results
electResult = client.electLeaders(ElectionType.PREFERRED, Set(unknownPartition, partition1).asJava)
assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get.keySet)
- waitForLeaderToBecome(partition1, 2)
- assertEquals(1, currentLeader(partition2))
+ waitForLeaderToBecome(client, partition1, Some(2))
+ assertEquals(Some(1), currentLeader(client, partition2))
exception = electResult.partitions.get.get(unknownPartition).get
assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
assertEquals("The partition does not exist.", exception.getMessage)
@@ -1367,17 +1354,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition2).asJava)
assertEquals(Set(partition2).asJava, electResult.partitions.get.keySet)
assertFalse(electResult.partitions.get.get(partition2).isPresent)
- waitForLeaderToBecome(partition2, 2)
+ waitForLeaderToBecome(client, partition2, Some(2))
// Now change the preferred leader to 1
changePreferredLeader(prefer1)
// but shut it down...
servers(1).shutdown()
- waitUntilTrue (() => {
- val description = client.describeTopics(Set(partition1.topic, partition2.topic).asJava).all.get
- val isr = description.asScala.values.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
- !isr.exists(_.id == 1)
- }, "Expect broker 1 to no longer be in any ISR")
+ waitForBrokerOutOfIsr(client, Set(partition1, partition2), 1)
// ... now what happens if we try to elect the preferred leader and it's down?
val shortTimeout = new ElectLeadersOptions().timeoutMs(10000)
@@ -1387,7 +1370,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
- assertEquals(2, currentLeader(partition1))
+ assertEquals(Some(2), currentLeader(client, partition1))
// preferred leader unavailable with null argument
electResult = client.electLeaders(ElectionType.PREFERRED, null, shortTimeout)
@@ -1402,8 +1385,235 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
- assertEquals(2, currentLeader(partition1))
- assertEquals(2, currentLeader(partition2))
+ assertEquals(Some(2), currentLeader(client, partition1))
+ assertEquals(Some(2), currentLeader(client, partition2))
+ }
+
+ @Test
+ def testElectUncleanLeadersForOnePartition(): Unit = {
+ // Case: unclean leader election with one topic partition
+ client = AdminClient.create(createConfig)
+
+ val broker1 = 1
+ val broker2 = 2
+ val assignment1 = Seq(broker1, broker2)
+
+ val partition1 = new TopicPartition("unclean-test-topic-1", 0)
+ TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition -> assignment1), servers)
+
+ waitForLeaderToBecome(client, partition1, Option(broker1))
+
+ servers(broker2).shutdown()
+ waitForBrokerOutOfIsr(client, Set(partition1), broker2)
+ servers(broker1).shutdown()
+ waitForLeaderToBecome(client, partition1, None)
+ servers(broker2).startup()
+
+ val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
+ assertFalse(electResult.partitions.get.get(partition1).isPresent)
+ assertEquals(Option(broker2), currentLeader(client, partition1))
+ }
+
+ @Test
+ def testElectUncleanLeadersForManyPartitions(): Unit = {
+ // Case: unclean leader election with many topic partitions
+ client = AdminClient.create(createConfig)
+
+ val broker1 = 1
+ val broker2 = 2
+ val assignment1 = Seq(broker1, broker2)
+ val assignment2 = Seq(broker1, broker2)
+
+ val topic = "unclean-test-topic-1"
+ val partition1 = new TopicPartition(topic, 0)
+ val partition2 = new TopicPartition(topic, 1)
+
+ TestUtils.createTopic(
+ zkClient,
+ topic,
+ Map(partition1.partition -> assignment1, partition2.partition -> assignment2),
+ servers
+ )
+
+ waitForLeaderToBecome(client, partition1, Option(broker1))
+ waitForLeaderToBecome(client, partition2, Option(broker1))
+
+ servers(broker2).shutdown()
+ waitForBrokerOutOfIsr(client, Set(partition1, partition2), broker2)
+ servers(broker1).shutdown()
+ waitForLeaderToBecome(client, partition1, None)
+ waitForLeaderToBecome(client, partition2, None)
+ servers(broker2).startup()
+
+ val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava)
+ assertFalse(electResult.partitions.get.get(partition1).isPresent)
+ assertFalse(electResult.partitions.get.get(partition2).isPresent)
+ assertEquals(Option(broker2), currentLeader(client, partition1))
+ assertEquals(Option(broker2), currentLeader(client, partition2))
+ }
+
+ @Test
+ def testElectUncleanLeadersForAllPartitions(): Unit = {
+ // Case: noop unclean leader election and valid unclean leader election for all partitions
+ client = AdminClient.create(createConfig)
+
+ val broker1 = 1
+ val broker2 = 2
+ val broker3 = 0
+ val assignment1 = Seq(broker1, broker2)
+ val assignment2 = Seq(broker1, broker3)
+
+ val topic = "unclean-test-topic-1"
+ val partition1 = new TopicPartition(topic, 0)
+ val partition2 = new TopicPartition(topic, 1)
+
+ TestUtils.createTopic(
+ zkClient,
+ topic,
+ Map(partition1.partition -> assignment1, partition2.partition -> assignment2),
+ servers
+ )
+
+ waitForLeaderToBecome(client, partition1, Option(broker1))
+ waitForLeaderToBecome(client, partition2, Option(broker1))
+
+ servers(broker2).shutdown()
+ waitForBrokerOutOfIsr(client, Set(partition1), broker2)
+ servers(broker1).shutdown()
+ waitForLeaderToBecome(client, partition1, None)
+ waitForLeaderToBecome(client, partition2, Some(broker3))
+ servers(broker2).startup()
+
+ val electResult = client.electLeaders(ElectionType.UNCLEAN, null)
+ assertFalse(electResult.partitions.get.get(partition1).isPresent)
+ assertFalse(electResult.partitions.get.containsKey(partition2))
+ assertEquals(Option(broker2), currentLeader(client, partition1))
+ assertEquals(Option(broker3), currentLeader(client, partition2))
+ }
+
+ @Test
+ def testElectUncleanLeadersForUnknownPartitions(): Unit = {
+ // Case: unclean leader election for unknown topic
+ client = AdminClient.create(createConfig)
+
+ val broker1 = 1
+ val broker2 = 2
+ val assignment1 = Seq(broker1, broker2)
+
+ val topic = "unclean-test-topic-1"
+ val unknownPartition = new TopicPartition(topic, 1)
+ val unknownTopic = new TopicPartition("unknown-topic", 0)
+
+ TestUtils.createTopic(
+ zkClient,
+ topic,
+ Map(0 -> assignment1),
+ servers
+ )
+
+ waitForLeaderToBecome(client, new TopicPartition(topic, 0), Option(broker1))
+
+ val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(unknownPartition, unknownTopic).asJava)
+ assertTrue(electResult.partitions.get.get(unknownPartition).get.isInstanceOf[UnknownTopicOrPartitionException])
+ assertTrue(electResult.partitions.get.get(unknownTopic).get.isInstanceOf[UnknownTopicOrPartitionException])
+ }
+
+ @Test
+ def testElectUncleanLeadersWhenNoLiveBrokers(): Unit = {
+ // Case: unclean leader election with no live brokers
+ client = AdminClient.create(createConfig)
+
+ val broker1 = 1
+ val broker2 = 2
+ val assignment1 = Seq(broker1, broker2)
+
+ val topic = "unclean-test-topic-1"
+ val partition1 = new TopicPartition(topic, 0)
+
+ TestUtils.createTopic(
+ zkClient,
+ topic,
+ Map(partition1.partition -> assignment1),
+ servers
+ )
+
+ waitForLeaderToBecome(client, partition1, Option(broker1))
+
+ servers(broker2).shutdown()
+ waitForBrokerOutOfIsr(client, Set(partition1), broker2)
+ servers(broker1).shutdown()
+ waitForLeaderToBecome(client, partition1, None)
+
+ val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
+ assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException])
+ }
+
+ @Test
+ def testElectUncleanLeadersNoop(): Unit = {
+ // Case: noop unclean leader election with explicit topic partitions
+ client = AdminClient.create(createConfig)
+
+ val broker1 = 1
+ val broker2 = 2
+ val assignment1 = Seq(broker1, broker2)
+
+ val topic = "unclean-test-topic-1"
+ val partition1 = new TopicPartition(topic, 0)
+
+ TestUtils.createTopic(
+ zkClient,
+ topic,
+ Map(partition1.partition -> assignment1),
+ servers
+ )
+
+ waitForLeaderToBecome(client, partition1, Option(broker1))
+
+ servers(broker1).shutdown()
+ waitForLeaderToBecome(client, partition1, Some(broker2))
+ servers(broker1).startup()
+
+ val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
+ assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[ElectionNotNeededException])
+ }
+
+ @Test
+ def testElectUncleanLeadersAndNoop(): Unit = {
+ // Case: one noop unclean leader election and one valid unclean leader election
+ client = AdminClient.create(createConfig)
+
+ val broker1 = 1
+ val broker2 = 2
+ val broker3 = 0
+ val assignment1 = Seq(broker1, broker2)
+ val assignment2 = Seq(broker1, broker3)
+
+ val topic = "unclean-test-topic-1"
+ val partition1 = new TopicPartition(topic, 0)
+ val partition2 = new TopicPartition(topic, 1)
+
+ TestUtils.createTopic(
+ zkClient,
+ topic,
+ Map(partition1.partition -> assignment1, partition2.partition -> assignment2),
+ servers
+ )
+
+ waitForLeaderToBecome(client, partition1, Option(broker1))
+ waitForLeaderToBecome(client, partition2, Option(broker1))
+
+ servers(broker2).shutdown()
+ waitForBrokerOutOfIsr(client, Set(partition1), broker2)
+ servers(broker1).shutdown()
+ waitForLeaderToBecome(client, partition1, None)
+ waitForLeaderToBecome(client, partition2, Some(broker3))
+ servers(broker2).startup()
+
+ val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava)
+ assertFalse(electResult.partitions.get.get(partition1).isPresent)
+ assertTrue(electResult.partitions.get.get(partition2).get.isInstanceOf[ElectionNotNeededException])
+ assertEquals(Option(broker2), currentLeader(client, partition1))
+ assertEquals(Option(broker3), currentLeader(client, partition2))
}
@Test
@@ -1724,4 +1934,34 @@ object AdminClientIntegrationTest {
assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
}
+ def currentLeader(client: AdminClient, topicPartition: TopicPartition): Option[Int] = {
+ Option(
+ client
+ .describeTopics(asList(topicPartition.topic))
+ .all
+ .get
+ .get(topicPartition.topic)
+ .partitions
+ .get(topicPartition.partition)
+ .leader
+ ).map(_.id)
+ }
+
+ def waitForLeaderToBecome(client: AdminClient, topicPartition: TopicPartition, leader: Option[Int]): Unit = {
+ TestUtils.waitUntilTrue(
+ () => currentLeader(client, topicPartition) == leader,
+ s"Expected leader to become $leader", 10000
+ )
+ }
+
+ def waitForBrokerOutOfIsr(client: AdminClient, partitions: Set[TopicPartition], brokerId: Int): Unit = {
+ TestUtils.waitUntilTrue(
+ () => {
+ val description = client.describeTopics(partitions.map(_.topic).asJava).all.get.asScala
+ val isr = description.values.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
+ isr.forall(_.id != brokerId)
+ },
+ s"Expect broker $brokerId to no longer be in any ISR for $partitions"
+ )
+ }
}
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 2974761..c0c8c95 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -18,7 +18,7 @@
package kafka.integration
import org.apache.kafka.common.config.ConfigException
-import org.junit.{After, Before, Ignore, Test}
+import org.junit.{After, Before, Test}
import scala.util.Random
import scala.collection.JavaConverters._
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 6b69c41..f83562c 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.server.LogDirFailureTest._
import kafka.api.IntegrationTestHarness
-import kafka.cluster.Partition
import kafka.controller.{OfflineReplica, PartitionAndReplica}
import kafka.utils.{CoreUtils, Exit, TestUtils}
import org.apache.kafka.clients.consumer.KafkaConsumer
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 780d189..6ab3138 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -23,7 +23,6 @@ import TestUtils._
import kafka.zk.ZooKeeperTestHarness
import java.io.File
-import kafka.cluster.Partition
import kafka.server.checkpoints.OffsetCheckpointFile
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition