You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/03 00:19:53 UTC

kafka git commit: MINOR: Fix setting of ACLs and ZK shutdown in test harnesses

Repository: kafka
Updated Branches:
  refs/heads/trunk 245fa2bd8 -> 102903046


MINOR: Fix setting of ACLs and ZK shutdown in test harnesses

I found both issues while investigating the issue described in PR #1425.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Sriharsha Chintalapani <sc...@hortonworks.com>, Jun Rao <ju...@gmail.com>

Closes #1455 from ijuma/fix-integration-test-harness-and-zk-test-harness


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10290304
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10290304
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10290304

Branch: refs/heads/trunk
Commit: 1029030466f01937d416e11f93562bcaaecce253
Parents: 245fa2b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Jun 3 01:19:46 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jun 3 01:19:46 2016 +0100

----------------------------------------------------------------------
 .../kafka/api/EndToEndAuthorizationTest.scala   |  7 ++--
 .../integration/KafkaServerTestHarness.scala    | 34 ++++++++++++--------
 .../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 10 ++++++
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    | 13 --------
 4 files changed, 33 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/10290304/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index fec96cd..e13f160 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -61,12 +61,11 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
   override val producerCount = 1
   override val consumerCount = 2
   override val serverCount = 3
-  override val setClusterAcl = Some { () =>
+
+  override def setAclsBeforeServersStart() {
     AclCommand.main(clusterAclArgs)
-    servers.foreach(s =>
-      TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, clusterResource)
-    )
   }
+
   val numRecords = 1
   val group = "group"
   val topic = "e2etopic"

http://git-wip-us.apache.org/repos/asf/kafka/blob/10290304/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 8e8ae8b..7059d17 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -38,7 +38,6 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
   var brokerList: String = null
   var alive: Array[Boolean] = null
   val kafkaPrincipalType = KafkaPrincipal.USER_TYPE
-  val setClusterAcl: Option[() => Unit] = None
 
   /**
    * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
@@ -46,13 +45,26 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
    */
   def generateConfigs(): Seq[KafkaConfig]
 
+  /**
+   * Override this in case ACLs must be set before `servers` are started.
+   *
+   * This is required in some cases because of the topic creation in the setup of `IntegrationTestHarness`. If the ACLs
+   * are only set later, tests may fail. The failure could manifest itself as a cluster action
+   * authorization exception when processing an update metadata request (controller -> broker) or in more obscure
+   * ways (e.g. __consumer_offsets topic replication fails because the metadata cache has no brokers as a previous
+   * update metadata request failed due to an authorization exception).
+   *
+   * The default implementation of this method is a no-op.
+   */
+  def setAclsBeforeServersStart() {}
+
   def configs: Seq[KafkaConfig] = {
     if (instanceConfigs == null)
       instanceConfigs = generateConfigs()
     instanceConfigs
   }
 
-  def serverForId(id: Int) = servers.find(s => s.config.brokerId == id)
+  def serverForId(id: Int): Option[KafkaServer] = servers.find(s => s.config.brokerId == id)
 
   protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
   protected def trustStoreFile: Option[File] = None
@@ -61,23 +73,17 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
   @Before
   override def setUp() {
     super.setUp
-    if (configs.size <= 0)
+
+    if (configs.isEmpty)
       throw new KafkaException("Must supply at least one server config.")
+
+    // default implementation is a no-op, it is overridden by subclasses if required
+    setAclsBeforeServersStart()
+
     servers = configs.map(TestUtils.createServer(_)).toBuffer
     brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol)
     alive = new Array[Boolean](servers.length)
     Arrays.fill(alive, true)
-    // We need to set a cluster ACL in some cases here
-    // because of the topic creation in the setup of
-    // IntegrationTestHarness. If we don't, then tests
-    // fail with a cluster action authorization exception
-    // when processing an update metadata request
-    // (controller -> broker).
-    //
-    // The following method does nothing by default, but
-    // if the test case requires setting up a cluster ACL,
-    // then it needs to be implemented.
-    setClusterAcl.foreach(_.apply)
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/10290304/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 1030c46..22465ea 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -39,6 +39,16 @@ class EmbeddedZookeeper() {
   def shutdown() {
     CoreUtils.swallow(zookeeper.shutdown())
     CoreUtils.swallow(factory.shutdown())
+
+    def isDown(): Boolean = {
+      try {
+        ZkFourLetterWords.sendStat("127.0.0.1", port, 3000)
+        false
+      } catch { case _: Throwable => true }
+    }
+
+    Iterator.continually(isDown()).exists(identity)
+
     Utils.delete(logDir)
     Utils.delete(snapshotDir)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/10290304/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 0de11cd..305e074 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -46,19 +46,6 @@ trait ZooKeeperTestHarness extends JUnitSuite with Logging {
      CoreUtils.swallow(zkUtils.close())
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown())
-
-    def isDown(): Boolean = {
-      try {
-        ZkFourLetterWords.sendStat("127.0.0.1", zkPort, 3000)
-        false
-      } catch { case _: Throwable =>
-        debug("Server is down")
-        true
-      }
-    }
-
-    Iterator.continually(isDown()).exists(identity)
-
     Configuration.setConfiguration(null)
   }