You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/06/13 12:47:57 UTC
kafka git commit: KAFKA-5439;
Verify that no unexpected threads are left behind in tests
Repository: kafka
Updated Branches:
refs/heads/trunk 435e5e196 -> aaca1b478
KAFKA-5439; Verify that no unexpected threads are left behind in tests
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3314 from rajinisivaram/KAFKA-5439
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aaca1b47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aaca1b47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aaca1b47
Branch: refs/heads/trunk
Commit: aaca1b478fcce99860e2eb89aa468ff863458572
Parents: 435e5e1
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Tue Jun 13 13:47:41 2017 +0100
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Tue Jun 13 13:47:41 2017 +0100
----------------------------------------------------------------------
.../consumer/internals/AbstractCoordinator.java | 3 +-
.../kafka/clients/producer/KafkaProducer.java | 3 +-
.../controller/ControllerEventManager.scala | 5 +-
.../kafka/api/ConsumerBounceTest.scala | 4 +-
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 54 +++++++++++++++++++-
5 files changed, 63 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index aa3807e..d36f711 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -93,6 +93,7 @@ import org.apache.kafka.common.errors.InterruptException;
public abstract class AbstractCoordinator implements Closeable {
private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class);
+ public static final String HEARTBEAT_THREAD_PREFIX = "kafka-coordinator-heartbeat-thread";
private enum MemberState {
UNJOINED, // the client is not part of a group
@@ -863,7 +864,7 @@ public abstract class AbstractCoordinator implements Closeable {
private AtomicReference<RuntimeException> failed = new AtomicReference<>(null);
private HeartbeatThread() {
- super("kafka-coordinator-heartbeat-thread" + (groupId.isEmpty() ? "" : " | " + groupId), true);
+ super(HEARTBEAT_THREAD_PREFIX + (groupId.isEmpty() ? "" : " | " + groupId), true);
}
public void enable() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 9e84a31..1be30f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -148,6 +148,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.producer";
+ public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
private String clientId;
// Visible for testing
@@ -322,7 +323,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
- String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
+ String ioThreadName = NETWORK_THREAD_PREFIX + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/core/src/main/scala/kafka/controller/ControllerEventManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index 3c0da23..f7ed54e 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -24,13 +24,16 @@ import scala.collection._
import kafka.metrics.KafkaTimer
import kafka.utils.ShutdownableThread
+object ControllerEventManager {
+ val ControllerEventThreadName = "controller-event-thread"
+}
class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
eventProcessedListener: ControllerEvent => Unit) {
@volatile private var _state: ControllerState = ControllerState.Idle
private val queue = new LinkedBlockingQueue[ControllerEvent]
- private val thread = new ControllerEventThread("controller-event-thread")
+ private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
def state: ControllerState = _state
http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 4057ccf..dc51d67 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -348,7 +348,9 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
private def createConsumer(groupId: String) : KafkaConsumer[Array[Byte], Array[Byte]] = {
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
- createNewConsumer
+ val consumer = createNewConsumer
+ consumers += consumer
+ consumer
}
private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, numRecords: Int) : KafkaConsumer[Array[Byte], Array[Byte]] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/aaca1b47/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index a250633..0a7e631 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,13 +19,20 @@ package kafka.zk
import javax.security.auth.login.Configuration
-import kafka.utils.{CoreUtils, Logging, ZkUtils}
-import org.junit.{After, Before}
+import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import org.junit.{After, AfterClass, Before, BeforeClass}
+import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.test.IntegrationTest
import org.junit.experimental.categories.Category
+import scala.collection.Set
+import scala.collection.JavaConverters._
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
+import kafka.controller.ControllerEventManager
+
@Category(Array(classOf[IntegrationTest]))
abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@@ -54,3 +61,46 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
Configuration.setConfiguration(null)
}
}
+
+object ZooKeeperTestHarness {
+ val ZkClientEventThreadPrefix = "ZkClient-EventThread"
+
+ // Threads which may cause transient failures in subsequent tests if not shutdown.
+ // These include threads which make connections to brokers and may cause issues
+ // when broker ports are reused (e.g. auto-create topics) as well as threads
+ // which reset static JAAS configuration.
+ val unexpectedThreadNames = Set(ControllerEventManager.ControllerEventThreadName,
+ KafkaProducer.NETWORK_THREAD_PREFIX,
+ AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
+ ZkClientEventThreadPrefix)
+
+ /**
+ * Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't left behind an unexpected thread.
+ * This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass,
+ * which is true for core tests where this harness is used.
+ */
+ @BeforeClass
+ def setUpClass() {
+ verifyNoUnexpectedThreads()
+ }
+
+ /**
+ * Verify that tests from the current test class using ZooKeeperTestHarness haven't left behind an unexpected thread
+ */
+ @AfterClass
+ def tearDownClass() {
+ verifyNoUnexpectedThreads()
+ }
+
+ /**
+ * Verifies that threads which are known to cause transient failures in subsequent tests
+ * have been shutdown.
+ */
+ def verifyNoUnexpectedThreads() {
+ def allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName)
+ val (threads, noUnexpected) = TestUtils.computeUntilTrue(allThreads) { threads =>
+ threads.forall(t => unexpectedThreadNames.forall(s => !t.contains(s)))
+ }
+ assertTrue(s"Found unexpected threads, allThreads=$threads", noUnexpected)
+ }
+}