You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/09 15:17:58 UTC
kafka git commit: KAFKA-6164;
Shutdown quota managers if other components fail to start
Repository: kafka
Updated Branches:
refs/heads/trunk 4a55818bb -> fe3171844
KAFKA-6164; Shutdown quota managers if other components fail to start
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Viktor Somogyi <vi...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #4181 from rajinisivaram/KAFKA-6164
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fe317184
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fe317184
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fe317184
Branch: refs/heads/trunk
Commit: fe3171844b599a3e6c0cfe1e606a21eb4ef32ce6
Parents: 4a55818
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu Nov 9 15:17:52 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Nov 9 15:17:52 2017 +0000
----------------------------------------------------------------------
.../scala/kafka/server/ClientQuotaManager.scala | 9 ++++---
.../server/ClientRequestQuotaManager.scala | 4 ++-
.../src/main/scala/kafka/server/KafkaApis.scala | 1 -
.../main/scala/kafka/server/KafkaServer.scala | 4 ++-
.../main/scala/kafka/server/QuotaFactory.scala | 8 +++---
.../kafka/server/ClientQuotaManagerTest.scala | 15 +++++------
.../server/HighwatermarkPersistenceTest.scala | 4 +--
.../unit/kafka/server/ISRExpirationTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 8 +++++-
.../kafka/server/ReplicaManagerQuotasTest.scala | 2 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 10 ++++----
.../unit/kafka/server/ServerShutdownTest.scala | 26 ++++++++++++++++++--
.../unit/kafka/server/SimpleFetchTest.scala | 2 +-
.../ThrottledResponseExpirationTest.scala | 4 +--
.../epoch/OffsetsForLeaderEpochTest.scala | 6 ++---
15 files changed, 69 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 96739f6..a6efa92 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -106,7 +106,8 @@ case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String
class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val metrics: Metrics,
private val quotaType: QuotaType,
- private val time: Time) extends Logging {
+ private val time: Time,
+ threadNamePrefix: String) extends Logging {
private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
@volatile private var quotaTypesEnabled =
@@ -115,7 +116,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val lock = new ReentrantReadWriteLock()
private val delayQueue = new DelayQueue[ThrottledResponse]()
private val sensorAccessor = new SensorAccess(lock, metrics)
- val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
+ private[server] val throttledRequestReaper = new ThrottledRequestReaper(delayQueue, threadNamePrefix)
private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
delayQueueSensor.add(metrics.metricName("queue-size",
@@ -130,8 +131,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* Reaper thread that triggers callbacks on all throttled requests
* @param delayQueue DelayQueue to dequeue from
*/
- class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse]) extends ShutdownableThread(
- "ThrottledRequestReaper-%s".format(quotaType), false) {
+ class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse], prefix: String) extends ShutdownableThread(
+ s"${prefix}ThrottledRequestReaper-${quotaType}", false) {
override def doWork(): Unit = {
val response: ThrottledResponse = delayQueue.poll(1, TimeUnit.SECONDS)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index f454483..59fa421 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -26,7 +26,9 @@ import org.apache.kafka.common.utils.Time
class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
private val metrics: Metrics,
- private val time: Time) extends ClientQuotaManager(config, metrics, QuotaType.Request, time) {
+ private val time: Time,
+ threadNamePrefix: String)
+ extends ClientQuotaManager(config, metrics, QuotaType.Request, time, threadNamePrefix) {
val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
def exemptSensor = getOrCreateSensor(exemptSensorName, exemptMetricName)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3f8ac49..b9a1971 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -86,7 +86,6 @@ class KafkaApis(val requestChannel: RequestChannel,
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
def close() {
- quotas.shutdown()
info("Shutdown complete.")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index df36662..a0732fd 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -216,7 +216,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
- quotaManagers = QuotaFactory.instantiate(config, metrics, time)
+ quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
@@ -556,6 +556,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
if (zkClient != null)
CoreUtils.swallow(zkClient.close())
+ if (quotaManagers != null)
+ CoreUtils.swallow(quotaManagers.shutdown())
if (metrics != null)
CoreUtils.swallow(metrics.close())
if (brokerTopicStats != null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/main/scala/kafka/server/QuotaFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index 13cd010..01441b5 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -52,11 +52,11 @@ object QuotaFactory extends Logging {
}
}
- def instantiate(cfg: KafkaConfig, metrics: Metrics, time: Time): QuotaManagers = {
+ def instantiate(cfg: KafkaConfig, metrics: Metrics, time: Time, threadNamePrefix: String): QuotaManagers = {
QuotaManagers(
- new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time),
- new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time),
- new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time),
+ new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix),
+ new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix),
+ new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time),
new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, AlterLogDirsReplication, time)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 31d32d2..1aabbb3 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -39,7 +39,7 @@ class ClientQuotaManagerTest {
}
private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: UserClient) {
- val clientMetrics = new ClientQuotaManager(config, newMetrics, QuotaType.Produce, time)
+ val clientMetrics = new ClientQuotaManager(config, newMetrics, QuotaType.Produce, time, "")
try {
// Case 1: Update the quota. Assert that the new quota value is returned
@@ -149,7 +149,8 @@ class ClientQuotaManagerTest {
@Test
def testQuotaConfigPrecedence() {
- val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue), newMetrics, QuotaType.Produce, time)
+ val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
+ newMetrics, QuotaType.Produce, time, "")
def checkQuota(user: String, clientId: String, expectedBound: Int, value: Int, expectThrottle: Boolean) {
assertEquals(new Quota(expectedBound, true), quotaManager.quota(user, clientId))
@@ -222,7 +223,7 @@ class ClientQuotaManagerTest {
@Test
def testQuotaViolation() {
val metrics = newMetrics
- val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
+ val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", ""))
try {
/* We have 10 second windows. Make sure that there is no quota violation
@@ -270,7 +271,7 @@ class ClientQuotaManagerTest {
@Test
def testRequestPercentageQuotaViolation() {
val metrics = newMetrics
- val quotaManager = new ClientRequestQuotaManager(config, metrics, time)
+ val quotaManager = new ClientRequestQuotaManager(config, metrics, time, "")
quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some("test-client"), Some(Quota.upperBound(1)))
val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Request", ""))
def millisToPercent(millis: Double) = millis * 1000 * 1000 * ClientQuotaManagerConfig.NanosToPercentagePerSecond
@@ -332,7 +333,7 @@ class ClientQuotaManagerTest {
@Test
def testExpireThrottleTimeSensor() {
val metrics = newMetrics
- val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
+ val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
try {
clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 100, callback)
// remove the throttle time sensor
@@ -351,7 +352,7 @@ class ClientQuotaManagerTest {
@Test
def testExpireQuotaSensors() {
val metrics = newMetrics
- val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
+ val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
try {
clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 100, callback)
// remove all the sensors
@@ -375,7 +376,7 @@ class ClientQuotaManagerTest {
@Test
def testClientIdNotSanitized() {
val metrics = newMetrics
- val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
+ val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
val clientId = "client@#$%"
try {
clientMetrics.maybeRecordAndThrottle("ANONYMOUS", clientId, 100, callback)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 63aeffb..efae329 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -63,7 +63,7 @@ class HighwatermarkPersistenceTest {
val time = new MockTime
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
- logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time),
+ logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
replicaManager.startup()
try {
@@ -108,7 +108,7 @@ class HighwatermarkPersistenceTest {
val time = new MockTime
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
- scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time),
+ scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
replicaManager.startup()
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 000dbb5..8212ed6 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -59,7 +59,7 @@ class IsrExpirationTest {
EasyMock.replay(logManager)
replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, logManager, new AtomicBoolean(false),
- QuotaFactory.instantiate(configs.head, metrics, time), new BrokerTopicStats, new MetadataCache(configs.head.brokerId),
+ QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats, new MetadataCache(configs.head.brokerId),
new LogDirFailureChannel(configs.head.logDirs.size))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index c3b9ceb..cabbd5d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.Test
+import org.junit.{After, Test}
import scala.collection.JavaConverters._
import scala.collection.Map
@@ -74,6 +74,12 @@ class KafkaApisTest {
private val clusterId = "clusterId"
private val time = new MockTime
+ @After
+ def tearDown() {
+ quotas.shutdown()
+ metrics.close()
+ }
+
def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): KafkaApis = {
val properties = TestUtils.createBrokerConfig(brokerId, "zk")
properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index a21b240..38085b8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -183,7 +183,7 @@ class ReplicaManagerQuotasTest {
replay(logManager)
replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
- new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time),
+ new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
new BrokerTopicStats, new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
//create the two replicas
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 10a66bd..b9d884a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -69,7 +69,7 @@ class ReplicaManagerTest {
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+ new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
try {
val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
@@ -88,7 +88,7 @@ class ReplicaManagerTest {
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+ new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
try {
val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
@@ -106,7 +106,7 @@ class ReplicaManagerTest {
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+ new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), Option(this.getClass.getName))
try {
def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
@@ -139,7 +139,7 @@ class ReplicaManagerTest {
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
EasyMock.replay(metadataCache)
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+ new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size))
try {
@@ -611,7 +611,7 @@ class ReplicaManagerTest {
purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
- new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+ new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory,
mockDeleteRecordsPurgatory, Option(this.getClass.getName))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 23c40fe..75adf55 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -26,9 +26,12 @@ import java.io.File
import kafka.log.LogManager
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
+import org.I0Itec.zkclient.exception.ZkException
import org.junit.{Before, Test}
import org.junit.Assert._
+import scala.reflect.ClassTag
class ServerShutdownTest extends ZooKeeperTestHarness {
var config: KafkaConfig = null
@@ -127,7 +130,25 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
val newProps = TestUtils.createBrokerConfig(0, zkConnect)
newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
val newConfig = KafkaConfig.fromProps(newProps)
- val server = new KafkaServer(newConfig, threadNamePrefix = Option(this.getClass.getName))
+ verifyCleanShutdownAfterFailedStartup[ZkException](newConfig)
+ }
+
+ @Test
+ def testCleanShutdownAfterFailedStartupDueToCorruptLogs() {
+ var server = new KafkaServer(config)
+ server.startup()
+ createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
+ server.shutdown()
+ server.awaitShutdown()
+ config.logDirs.foreach { dirName =>
+ val partitionDir = new File(dirName, s"$topic-0")
+ partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1))
+ }
+ verifyCleanShutdownAfterFailedStartup[KafkaStorageException](config)
+ }
+
+ private def verifyCleanShutdownAfterFailedStartup[E <: Exception](config: KafkaConfig)(implicit exceptionClassTag: ClassTag[E]) {
+ val server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
try {
server.startup()
fail("Expected KafkaServer setup to fail and throw exception")
@@ -136,7 +157,8 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
// Try to clean up carefully without hanging even if the test fails. This means trying to accurately
// identify the correct exception, making sure the server was shutdown, and cleaning up if anything
// goes wrong so that awaitShutdown doesn't hang
- case _: org.I0Itec.zkclient.exception.ZkException =>
+ case e: Exception =>
+ assertTrue(s"Unexpected exception $e", exceptionClassTag.runtimeClass.isInstance(e))
assertEquals(NotRunning.state, server.brokerState.currentState)
}
finally {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index c6d3384..92b230e 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -112,7 +112,7 @@ class SimpleFetchTest {
// create the replica manager
replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
- new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time), new BrokerTopicStats,
+ new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
// add the partition with two replicas, both in ISR
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
index aae66d8..d7634c0 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
@@ -43,10 +43,10 @@ class ThrottledResponseExpirationTest {
@Test
def testExpire() {
- val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, QuotaType.Produce, time)
+ val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, QuotaType.Produce, time, "")
val delayQueue = new DelayQueue[ThrottledResponse]()
- val reaper = new clientMetrics.ThrottledRequestReaper(delayQueue)
+ val reaper = new clientMetrics.ThrottledRequestReaper(delayQueue, "")
try {
// Add 4 elements to the queue out of order. Add 2 elements with the same expire timestamp
delayQueue.add(new ThrottledResponse(time, 10, callback))
http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 32bf29c..1c01d62 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -56,7 +56,7 @@ class OffsetsForLeaderEpochTest {
// create a replica manager with 1 partition that has 1 replica
val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
- QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+ QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
val partition = replicaManager.getOrCreatePartition(tp)
val leaderReplica = new Replica(config.brokerId, partition.topicPartition, time, 0, Some(mockLog))
@@ -78,7 +78,7 @@ class OffsetsForLeaderEpochTest {
//create a replica manager with 1 partition that has 0 replica
val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
- QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+ QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
replicaManager.getOrCreatePartition(tp)
@@ -101,7 +101,7 @@ class OffsetsForLeaderEpochTest {
//create a replica manager with 0 partition
val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
- QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+ QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
//Given