You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/03 00:19:53 UTC
kafka git commit: MINOR: Fix setting of ACLs and ZK shutdown in test
harnesses
Repository: kafka
Updated Branches:
refs/heads/trunk 245fa2bd8 -> 102903046
MINOR: Fix setting of ACLs and ZK shutdown in test harnesses
I found both issues while investigating the issue described in PR #1425.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Sriharsha Chintalapani <sc...@hortonworks.com>, Jun Rao <ju...@gmail.com>
Closes #1455 from ijuma/fix-integration-test-harness-and-zk-test-harness
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10290304
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10290304
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10290304
Branch: refs/heads/trunk
Commit: 1029030466f01937d416e11f93562bcaaecce253
Parents: 245fa2b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Jun 3 01:19:46 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jun 3 01:19:46 2016 +0100
----------------------------------------------------------------------
.../kafka/api/EndToEndAuthorizationTest.scala | 7 ++--
.../integration/KafkaServerTestHarness.scala | 34 ++++++++++++--------
.../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 10 ++++++
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 13 --------
4 files changed, 33 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/10290304/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index fec96cd..e13f160 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -61,12 +61,11 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
override val producerCount = 1
override val consumerCount = 2
override val serverCount = 3
- override val setClusterAcl = Some { () =>
+
+ override def setAclsBeforeServersStart() {
AclCommand.main(clusterAclArgs)
- servers.foreach(s =>
- TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, clusterResource)
- )
}
+
val numRecords = 1
val group = "group"
val topic = "e2etopic"
http://git-wip-us.apache.org/repos/asf/kafka/blob/10290304/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 8e8ae8b..7059d17 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -38,7 +38,6 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
var brokerList: String = null
var alive: Array[Boolean] = null
val kafkaPrincipalType = KafkaPrincipal.USER_TYPE
- val setClusterAcl: Option[() => Unit] = None
/**
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
@@ -46,13 +45,26 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
*/
def generateConfigs(): Seq[KafkaConfig]
+ /**
+ * Override this in case ACLs must be set before `servers` are started.
+ *
+ * This is required in some cases because of the topic creation in the setup of `IntegrationTestHarness`. If the ACLs
+ * are only set later, tests may fail. The failure could manifest itself as a cluster action
+ * authorization exception when processing an update metadata request (controller -> broker) or in more obscure
+ * ways (e.g. __consumer_offsets topic replication fails because the metadata cache has no brokers as a previous
+ * update metadata request failed due to an authorization exception).
+ *
+ * The default implementation of this method is a no-op.
+ */
+ def setAclsBeforeServersStart() {}
+
def configs: Seq[KafkaConfig] = {
if (instanceConfigs == null)
instanceConfigs = generateConfigs()
instanceConfigs
}
- def serverForId(id: Int) = servers.find(s => s.config.brokerId == id)
+ def serverForId(id: Int): Option[KafkaServer] = servers.find(s => s.config.brokerId == id)
protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
protected def trustStoreFile: Option[File] = None
@@ -61,23 +73,17 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
@Before
override def setUp() {
super.setUp
- if (configs.size <= 0)
+
+ if (configs.isEmpty)
throw new KafkaException("Must supply at least one server config.")
+
+ // default implementation is a no-op, it is overridden by subclasses if required
+ setAclsBeforeServersStart()
+
servers = configs.map(TestUtils.createServer(_)).toBuffer
brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol)
alive = new Array[Boolean](servers.length)
Arrays.fill(alive, true)
- // We need to set a cluster ACL in some cases here
- // because of the topic creation in the setup of
- // IntegrationTestHarness. If we don't, then tests
- // fail with a cluster action authorization exception
- // when processing an update metadata request
- // (controller -> broker).
- //
- // The following method does nothing by default, but
- // if the test case requires setting up a cluster ACL,
- // then it needs to be implemented.
- setClusterAcl.foreach(_.apply)
}
@After
http://git-wip-us.apache.org/repos/asf/kafka/blob/10290304/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 1030c46..22465ea 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -39,6 +39,16 @@ class EmbeddedZookeeper() {
def shutdown() {
CoreUtils.swallow(zookeeper.shutdown())
CoreUtils.swallow(factory.shutdown())
+
+ def isDown(): Boolean = {
+ try {
+ ZkFourLetterWords.sendStat("127.0.0.1", port, 3000)
+ false
+ } catch { case _: Throwable => true }
+ }
+
+ Iterator.continually(isDown()).exists(identity)
+
Utils.delete(logDir)
Utils.delete(snapshotDir)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/10290304/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 0de11cd..305e074 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -46,19 +46,6 @@ trait ZooKeeperTestHarness extends JUnitSuite with Logging {
CoreUtils.swallow(zkUtils.close())
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown())
-
- def isDown(): Boolean = {
- try {
- ZkFourLetterWords.sendStat("127.0.0.1", zkPort, 3000)
- false
- } catch { case _: Throwable =>
- debug("Server is down")
- true
- }
- }
-
- Iterator.continually(isDown()).exists(identity)
-
Configuration.setConfiguration(null)
}