You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/03/07 10:04:23 UTC

[kafka] branch trunk updated: KAFKA-8018: Flaky Test SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7373789  KAFKA-8018: Flaky Test SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed
7373789 is described below

commit 73737892d5a51688b3403454b6e0f1f59c85ba8d
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 7 15:33:54 2019 +0530

    KAFKA-8018: Flaky Test SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed
    
    Disable forceSync in EmbeddedZookeeper.
    Increase ZK tick to allow longer maxSessionTimeout in tests.
    Increase ZK client session timeout in tests.
    Handle transient ZK session expiration exception in test utils for createTopic.
    
    Author: Jun Rao <ju...@gmail.com>
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>
    
    Closes #6354 from junrao/KAFKA-8018
---
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 25 ++++++++++++++++++++--
 .../scala/unit/kafka/zk/EmbeddedZookeeper.scala    |  4 +++-
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  2 +-
 3 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d61771b..8b8b230 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -53,6 +53,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
+import org.apache.zookeeper.KeeperException.SessionExpiredException
 import org.apache.zookeeper.ZooDefs._
 import org.apache.zookeeper.data.ACL
 import org.junit.Assert._
@@ -301,7 +302,17 @@ object TestUtils extends Logging {
                   topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
     val adminZkClient = new AdminZkClient(zkClient)
     // create topic
-    adminZkClient.createTopic(topic, numPartitions, replicationFactor, topicConfig)
+    TestUtils.waitUntilTrue( () => {
+      var hasSessionExpirationException = false
+      try {
+        adminZkClient.createTopic(topic, numPartitions, replicationFactor, topicConfig)
+      } catch {
+        case _: SessionExpiredException => hasSessionExpirationException = true
+        case e => throw e // let other exceptions propagate
+      }
+      !hasSessionExpirationException},
+      s"Can't create topic $topic")
+
     // wait until the update metadata request for new topic reaches all servers
     (0 until numPartitions).map { i =>
       TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
@@ -333,7 +344,17 @@ object TestUtils extends Logging {
                   topicConfig: Properties): scala.collection.immutable.Map[Int, Int] = {
     val adminZkClient = new AdminZkClient(zkClient)
     // create topic
-    adminZkClient.createTopicWithAssignment(topic, topicConfig, partitionReplicaAssignment)
+    TestUtils.waitUntilTrue( () => {
+      var hasSessionExpirationException = false
+      try {
+        adminZkClient.createTopicWithAssignment(topic, topicConfig, partitionReplicaAssignment)
+      } catch {
+        case _: SessionExpiredException => hasSessionExpirationException = true
+        case e => throw e // let other exceptions propagate
+      }
+      !hasSessionExpirationException},
+      s"Can't create topic $topic")
+
     // wait until the update metadata request for new topic reaches all servers
     partitionReplicaAssignment.keySet.map { i =>
       TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index d4a829d..32ca969 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -38,7 +38,9 @@ class EmbeddedZookeeper() extends Logging {
 
   val snapshotDir = TestUtils.tempDir()
   val logDir = TestUtils.tempDir()
-  val tickTime = 500
+  val tickTime = 800 // allow a maxSessionTimeout of 20 * 800ms = 16 secs
+
+  System.setProperty("zookeeper.forceSync", "no")  //disable fsync to ZK txn log in tests to avoid timeout
   val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
   val factory = new NIOServerCnxnFactory()
   private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort)
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 2f75fa2..ebb5fb7 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -40,7 +40,7 @@ import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
 
   val zkConnectionTimeout = 10000
-  val zkSessionTimeout = 6000
+  val zkSessionTimeout = 15000 // Allows us to avoid ZK session expiration due to GC up to 2/3 * 15000ms = 10 secs
   val zkMaxInFlightRequests = Int.MaxValue
 
   protected def zkAclsEnabled: Option[Boolean] = None