You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/09 15:17:58 UTC

kafka git commit: KAFKA-6164; Shutdown quota managers if other components fail to start

Repository: kafka
Updated Branches:
  refs/heads/trunk 4a55818bb -> fe3171844


KAFKA-6164; Shutdown quota managers if other components fail to start

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Viktor Somogyi <vi...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #4181 from rajinisivaram/KAFKA-6164


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fe317184
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fe317184
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fe317184

Branch: refs/heads/trunk
Commit: fe3171844b599a3e6c0cfe1e606a21eb4ef32ce6
Parents: 4a55818
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu Nov 9 15:17:52 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Nov 9 15:17:52 2017 +0000

----------------------------------------------------------------------
 .../scala/kafka/server/ClientQuotaManager.scala |  9 ++++---
 .../server/ClientRequestQuotaManager.scala      |  4 ++-
 .../src/main/scala/kafka/server/KafkaApis.scala |  1 -
 .../main/scala/kafka/server/KafkaServer.scala   |  4 ++-
 .../main/scala/kafka/server/QuotaFactory.scala  |  8 +++---
 .../kafka/server/ClientQuotaManagerTest.scala   | 15 +++++------
 .../server/HighwatermarkPersistenceTest.scala   |  4 +--
 .../unit/kafka/server/ISRExpirationTest.scala   |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala |  8 +++++-
 .../kafka/server/ReplicaManagerQuotasTest.scala |  2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 10 ++++----
 .../unit/kafka/server/ServerShutdownTest.scala  | 26 ++++++++++++++++++--
 .../unit/kafka/server/SimpleFetchTest.scala     |  2 +-
 .../ThrottledResponseExpirationTest.scala       |  4 +--
 .../epoch/OffsetsForLeaderEpochTest.scala       |  6 ++---
 15 files changed, 69 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 96739f6..a6efa92 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -106,7 +106,8 @@ case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String
 class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val metrics: Metrics,
                          private val quotaType: QuotaType,
-                         private val time: Time) extends Logging {
+                         private val time: Time,
+                         threadNamePrefix: String) extends Logging {
   private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
   private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
   @volatile private var quotaTypesEnabled =
@@ -115,7 +116,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   private val lock = new ReentrantReadWriteLock()
   private val delayQueue = new DelayQueue[ThrottledResponse]()
   private val sensorAccessor = new SensorAccess(lock, metrics)
-  val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
+  private[server] val throttledRequestReaper = new ThrottledRequestReaper(delayQueue, threadNamePrefix)
 
   private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
   delayQueueSensor.add(metrics.metricName("queue-size",
@@ -130,8 +131,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
    * Reaper thread that triggers callbacks on all throttled requests
    * @param delayQueue DelayQueue to dequeue from
    */
-  class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse]) extends ShutdownableThread(
-    "ThrottledRequestReaper-%s".format(quotaType), false) {
+  class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse], prefix: String) extends ShutdownableThread(
+    s"${prefix}ThrottledRequestReaper-${quotaType}", false) {
 
     override def doWork(): Unit = {
       val response: ThrottledResponse = delayQueue.poll(1, TimeUnit.SECONDS)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index f454483..59fa421 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -26,7 +26,9 @@ import org.apache.kafka.common.utils.Time
 
 class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
                                 private val metrics: Metrics,
-                                private val time: Time) extends ClientQuotaManager(config, metrics, QuotaType.Request, time) {
+                                private val time: Time,
+                                threadNamePrefix: String)
+                                extends ClientQuotaManager(config, metrics, QuotaType.Request, time, threadNamePrefix) {
   val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
   def exemptSensor = getOrCreateSensor(exemptSensorName, exemptMetricName)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3f8ac49..b9a1971 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -86,7 +86,6 @@ class KafkaApis(val requestChannel: RequestChannel,
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
   def close() {
-    quotas.shutdown()
     info("Shutdown complete.")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index df36662..a0732fd 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -216,7 +216,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         /* register broker metrics */
         _brokerTopicStats = new BrokerTopicStats
 
-        quotaManagers = QuotaFactory.instantiate(config, metrics, time)
+        quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
         notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
 
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
@@ -556,6 +556,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         if (zkClient != null)
           CoreUtils.swallow(zkClient.close())
 
+        if (quotaManagers != null)
+          CoreUtils.swallow(quotaManagers.shutdown())
         if (metrics != null)
           CoreUtils.swallow(metrics.close())
         if (brokerTopicStats != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/main/scala/kafka/server/QuotaFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index 13cd010..01441b5 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -52,11 +52,11 @@ object QuotaFactory extends Logging {
     }
   }
 
-  def instantiate(cfg: KafkaConfig, metrics: Metrics, time: Time): QuotaManagers = {
+  def instantiate(cfg: KafkaConfig, metrics: Metrics, time: Time, threadNamePrefix: String): QuotaManagers = {
     QuotaManagers(
-      new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time),
-      new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time),
-      new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time),
+      new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix),
+      new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix),
+      new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time),
       new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, AlterLogDirsReplication, time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 31d32d2..1aabbb3 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -39,7 +39,7 @@ class ClientQuotaManagerTest {
   }
 
   private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: UserClient) {
-    val clientMetrics = new ClientQuotaManager(config, newMetrics, QuotaType.Produce, time)
+    val clientMetrics = new ClientQuotaManager(config, newMetrics, QuotaType.Produce, time, "")
 
     try {
       // Case 1: Update the quota. Assert that the new quota value is returned
@@ -149,7 +149,8 @@ class ClientQuotaManagerTest {
 
   @Test
   def testQuotaConfigPrecedence() {
-    val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue), newMetrics, QuotaType.Produce, time)
+    val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
+        newMetrics, QuotaType.Produce, time, "")
 
     def checkQuota(user: String, clientId: String, expectedBound: Int, value: Int, expectThrottle: Boolean) {
       assertEquals(new Quota(expectedBound, true), quotaManager.quota(user, clientId))
@@ -222,7 +223,7 @@ class ClientQuotaManagerTest {
   @Test
   def testQuotaViolation() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
+    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
     val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", ""))
     try {
       /* We have 10 second windows. Make sure that there is no quota violation
@@ -270,7 +271,7 @@ class ClientQuotaManagerTest {
   @Test
   def testRequestPercentageQuotaViolation() {
     val metrics = newMetrics
-    val quotaManager = new ClientRequestQuotaManager(config, metrics, time)
+    val quotaManager = new ClientRequestQuotaManager(config, metrics, time, "")
     quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some("test-client"), Some(Quota.upperBound(1)))
     val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Request", ""))
     def millisToPercent(millis: Double) = millis * 1000 * 1000 * ClientQuotaManagerConfig.NanosToPercentagePerSecond
@@ -332,7 +333,7 @@ class ClientQuotaManagerTest {
   @Test
   def testExpireThrottleTimeSensor() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
+    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
     try {
       clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 100, callback)
       // remove the throttle time sensor
@@ -351,7 +352,7 @@ class ClientQuotaManagerTest {
   @Test
   def testExpireQuotaSensors() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
+    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
     try {
       clientMetrics.maybeRecordAndThrottle("ANONYMOUS", "client1", 100, callback)
       // remove all the sensors
@@ -375,7 +376,7 @@ class ClientQuotaManagerTest {
   @Test
   def testClientIdNotSanitized() {
     val metrics = newMetrics
-    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
+    val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time, "")
     val clientId = "client@#$%"
     try {
       clientMetrics.maybeRecordAndThrottle("ANONYMOUS", clientId, 100, callback)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 63aeffb..efae329 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -63,7 +63,7 @@ class HighwatermarkPersistenceTest {
     val time = new MockTime
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
-      logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time),
+      logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
       new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
     replicaManager.startup()
     try {
@@ -108,7 +108,7 @@ class HighwatermarkPersistenceTest {
     val time = new MockTime
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
-      scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time),
+      scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
       new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
     replicaManager.startup()
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 000dbb5..8212ed6 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -59,7 +59,7 @@ class IsrExpirationTest {
     EasyMock.replay(logManager)
 
     replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, logManager, new AtomicBoolean(false),
-      QuotaFactory.instantiate(configs.head, metrics, time), new BrokerTopicStats, new MetadataCache(configs.head.brokerId),
+      QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats, new MetadataCache(configs.head.brokerId),
       new LogDirFailureChannel(configs.head.logDirs.size))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index c3b9ceb..cabbd5d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Utils
 import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.Test
+import org.junit.{After, Test}
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
@@ -74,6 +74,12 @@ class KafkaApisTest {
   private val clusterId = "clusterId"
   private val time = new MockTime
 
+  @After
+  def tearDown() {
+    quotas.shutdown()
+    metrics.close()
+  }
+
   def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): KafkaApis = {
     val properties = TestUtils.createBrokerConfig(brokerId, "zk")
     properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index a21b240..38085b8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -183,7 +183,7 @@ class ReplicaManagerQuotasTest {
     replay(logManager)
 
     replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time),
+      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
       new BrokerTopicStats, new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
 
     //create the two replicas

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 10a66bd..b9d884a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -69,7 +69,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
@@ -88,7 +88,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
@@ -106,7 +106,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), Option(this.getClass.getName))
     try {
       def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
@@ -139,7 +139,7 @@ class ReplicaManagerTest {
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
     EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       metadataCache, new LogDirFailureChannel(config.logDirs.size))
 
     try {
@@ -611,7 +611,7 @@ class ReplicaManagerTest {
       purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
 
     new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory,
       mockDeleteRecordsPurgatory, Option(this.getClass.getName))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 23c40fe..75adf55 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -26,9 +26,12 @@ import java.io.File
 
 import kafka.log.LogManager
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
+import org.I0Itec.zkclient.exception.ZkException
 import org.junit.{Before, Test}
 import org.junit.Assert._
+import scala.reflect.ClassTag
 
 class ServerShutdownTest extends ZooKeeperTestHarness {
   var config: KafkaConfig = null
@@ -127,7 +130,25 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     val newProps = TestUtils.createBrokerConfig(0, zkConnect)
     newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
     val newConfig = KafkaConfig.fromProps(newProps)
-    val server = new KafkaServer(newConfig, threadNamePrefix = Option(this.getClass.getName))
+    verifyCleanShutdownAfterFailedStartup[ZkException](newConfig)
+  }
+
+  @Test
+  def testCleanShutdownAfterFailedStartupDueToCorruptLogs() {
+    var server = new KafkaServer(config)
+    server.startup()
+    createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
+    server.shutdown()
+    server.awaitShutdown()
+    config.logDirs.foreach { dirName =>
+      val partitionDir = new File(dirName, s"$topic-0")
+      partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1))
+    }
+    verifyCleanShutdownAfterFailedStartup[KafkaStorageException](config)
+  }
+
+  private def verifyCleanShutdownAfterFailedStartup[E <: Exception](config: KafkaConfig)(implicit exceptionClassTag: ClassTag[E]) {
+    val server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
     try {
       server.startup()
       fail("Expected KafkaServer setup to fail and throw exception")
@@ -136,7 +157,8 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
       // Try to clean up carefully without hanging even if the test fails. This means trying to accurately
       // identify the correct exception, making sure the server was shutdown, and cleaning up if anything
       // goes wrong so that awaitShutdown doesn't hang
-      case _: org.I0Itec.zkclient.exception.ZkException =>
+      case e: Exception =>
+        assertTrue(s"Unexpected exception $e", exceptionClassTag.runtimeClass.isInstance(e))
         assertEquals(NotRunning.state, server.brokerState.currentState)
     }
     finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index c6d3384..92b230e 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -112,7 +112,7 @@ class SimpleFetchTest {
 
     // create the replica manager
     replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time), new BrokerTopicStats,
+      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
 
     // add the partition with two replicas, both in ISR

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
index aae66d8..d7634c0 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala
@@ -43,10 +43,10 @@ class ThrottledResponseExpirationTest {
 
   @Test
   def testExpire() {
-    val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, QuotaType.Produce, time)
+    val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, QuotaType.Produce, time, "")
 
     val delayQueue = new DelayQueue[ThrottledResponse]()
-    val reaper = new clientMetrics.ThrottledRequestReaper(delayQueue)
+    val reaper = new clientMetrics.ThrottledRequestReaper(delayQueue, "")
     try {
       // Add 4 elements to the queue out of order. Add 2 elements with the same expire timestamp
       delayQueue.add(new ThrottledResponse(time, 10, callback))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe317184/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 32bf29c..1c01d62 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -56,7 +56,7 @@ class OffsetsForLeaderEpochTest {
 
     // create a replica manager with 1 partition that has 1 replica
     val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
-      QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+      QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     val partition = replicaManager.getOrCreatePartition(tp)
     val leaderReplica = new Replica(config.brokerId, partition.topicPartition, time, 0, Some(mockLog))
@@ -78,7 +78,7 @@ class OffsetsForLeaderEpochTest {
 
     //create a replica manager with 1 partition that has 0 replica
     val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
-      QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+      QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     replicaManager.getOrCreatePartition(tp)
 
@@ -101,7 +101,7 @@ class OffsetsForLeaderEpochTest {
 
     //create a replica manager with 0 partition
     val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false),
-      QuotaFactory.instantiate(config, metrics, time), new BrokerTopicStats,
+      QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
 
     //Given