You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/03/07 10:07:42 UTC
[kafka] branch 2.2 updated: KAFKA-8018: Flaky Test
SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 78648da KAFKA-8018: Flaky Test SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed
78648da is described below
commit 78648dab04f5b9edf598c73d0cf3b11e529e9c45
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 7 15:33:54 2019 +0530
KAFKA-8018: Flaky Test SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed
Disable forceSync in EmbeddedZookeeper.
Increase ZK tick to allow longer maxSessionTimeout in tests.
Increase ZK client session timeout in tests.
Handle transient ZK session expiration exception in test utils for createTopic.
Author: Jun Rao <ju...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>, Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #6354 from junrao/KAFKA-8018
---
.../test/scala/unit/kafka/utils/TestUtils.scala | 25 ++++++++++++++++++++--
.../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 4 +++-
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 2 +-
3 files changed, 27 insertions(+), 4 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c582bc5..3afe520 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -53,6 +53,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
+import org.apache.zookeeper.KeeperException.SessionExpiredException
import org.apache.zookeeper.ZooDefs._
import org.apache.zookeeper.data.ACL
import org.junit.Assert._
@@ -301,7 +302,17 @@ object TestUtils extends Logging {
topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
val adminZkClient = new AdminZkClient(zkClient)
// create topic
- adminZkClient.createTopic(topic, numPartitions, replicationFactor, topicConfig)
+ TestUtils.waitUntilTrue( () => {
+ var hasSessionExpirationException = false
+ try {
+ adminZkClient.createTopic(topic, numPartitions, replicationFactor, topicConfig)
+ } catch {
+ case _: SessionExpiredException => hasSessionExpirationException = true
+ case e => throw e // let other exceptions propagate
+ }
+ !hasSessionExpirationException},
+ s"Can't create topic $topic")
+
// wait until the update metadata request for new topic reaches all servers
(0 until numPartitions).map { i =>
TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
@@ -333,7 +344,17 @@ object TestUtils extends Logging {
topicConfig: Properties): scala.collection.immutable.Map[Int, Int] = {
val adminZkClient = new AdminZkClient(zkClient)
// create topic
- adminZkClient.createTopicWithAssignment(topic, topicConfig, partitionReplicaAssignment)
+ TestUtils.waitUntilTrue( () => {
+ var hasSessionExpirationException = false
+ try {
+ adminZkClient.createTopicWithAssignment(topic, topicConfig, partitionReplicaAssignment)
+ } catch {
+ case _: SessionExpiredException => hasSessionExpirationException = true
+ case e => throw e // let other exceptions propagate
+ }
+ !hasSessionExpirationException},
+ s"Can't create topic $topic")
+
// wait until the update metadata request for new topic reaches all servers
partitionReplicaAssignment.keySet.map { i =>
TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index d4a829d..32ca969 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -38,7 +38,9 @@ class EmbeddedZookeeper() extends Logging {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
- val tickTime = 500
+ val tickTime = 800 // allow a maxSessionTimeout of 20 * 800ms = 16 secs
+
+ System.setProperty("zookeeper.forceSync", "no") //disable fsync to ZK txn log in tests to avoid timeout
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
val factory = new NIOServerCnxnFactory()
private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort)
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 2f75fa2..ebb5fb7 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -40,7 +40,7 @@ import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
val zkConnectionTimeout = 10000
- val zkSessionTimeout = 6000
+ val zkSessionTimeout = 15000 // Allows us to avoid ZK session expiration due to GC up to 2/3 * 15000ms = 10 secs
val zkMaxInFlightRequests = Int.MaxValue
protected def zkAclsEnabled: Option[Boolean] = None