You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/07/22 19:36:03 UTC

[2/5] kafka git commit: KAFKA-4763; Handle disk failure for JBOD (KIP-112)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index d7420dd..e856ca1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -18,16 +18,13 @@
 package kafka.server
 
 import java.util
-
 import kafka.admin.AdminUtils
 import kafka.api.{FetchRequest => _, _}
 import kafka.cluster.{BrokerEndPoint, Replica}
-import kafka.common.KafkaStorageException
 import kafka.log.LogConfig
 import kafka.server.ReplicaFetcherThread._
 import kafka.server.epoch.LeaderEpochCache
 import org.apache.kafka.common.requests.EpochEndOffset._
-import kafka.utils.Exit
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
@@ -35,7 +32,6 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, ListOffsetRequest, ListOffsetResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse, FetchRequest => JFetchRequest}
 import org.apache.kafka.common.utils.Time
-
 import scala.collection.JavaConverters._
 import scala.collection.{Map, mutable}
 
@@ -83,41 +79,35 @@ class ReplicaFetcherThread(name: String,
 
   // process fetched data
   def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
-    try {
-      val replica = replicaMgr.getReplica(topicPartition).get
-      val records = partitionData.toRecords
-
-      maybeWarnIfOversizedRecords(records, topicPartition)
-
-      if (fetchOffset != replica.logEndOffset.messageOffset)
-        throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
-      if (logger.isTraceEnabled)
-        trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
-          .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
-
-      // Append the leader's messages to the log
-      replica.log.get.appendAsFollower(records)
-
-      if (logger.isTraceEnabled)
-        trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
-          .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
-      val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
-      val leaderLogStartOffset = partitionData.logStartOffset
-      // for the follower replica, we do not need to keep
-      // its segment base offset the physical position,
-      // these values will be computed upon making the leader
-      replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
-      replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
-      if (logger.isTraceEnabled)
-        trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
-      if (quota.isThrottled(topicPartition))
-        quota.record(records.sizeInBytes)
-      replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
-    } catch {
-      case e: KafkaStorageException =>
-        fatal(s"Disk error while replicating data for $topicPartition", e)
-        Exit.halt(1)
-    }
+    val replica = replicaMgr.getReplica(topicPartition).get
+    val records = partitionData.toRecords
+
+    maybeWarnIfOversizedRecords(records, topicPartition)
+
+    if (fetchOffset != replica.logEndOffset.messageOffset)
+      throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
+    if (logger.isTraceEnabled)
+      trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
+        .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
+
+    // Append the leader's messages to the log
+    replica.log.get.appendAsFollower(records)
+
+    if (logger.isTraceEnabled)
+      trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
+        .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
+    val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
+    val leaderLogStartOffset = partitionData.logStartOffset
+    // for the follower replica, we do not need to keep
+    // its segment base offset the physical position,
+    // these values will be computed upon making the leader
+    replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
+    replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
+    if (logger.isTraceEnabled)
+      trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
+    if (quota.isThrottled(topicPartition))
+      quota.record(records.sizeInBytes)
+    replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
   }
 
   def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 853b7c4..40887be 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -16,14 +16,13 @@
  */
 package kafka.server
 
-import java.io.{File, IOException}
+import java.io.File
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
 import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.cluster.{Partition, Replica}
-import kafka.common.KafkaStorageException
 import kafka.controller.KafkaController
 import kafka.log.{Log, LogAppendInfo, LogManager}
 import kafka.metrics.KafkaMetricsGroup
@@ -31,11 +30,12 @@ import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _}
+import org.apache.kafka.common.errors.{KafkaStorageException, ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION
+import org.apache.kafka.common.protocol.Errors.KAFKA_STORAGE_ERROR
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
@@ -123,6 +123,7 @@ object ReplicaManager {
   val HighWatermarkFilename = "replication-offset-checkpoint"
   val IsrChangePropagationBlackOut = 5000L
   val IsrChangePropagationInterval = 60000L
+  val OfflinePartition = new Partition("", -1, null, null, isOffline = true)
 }
 
 class ReplicaManager(val config: KafkaConfig,
@@ -135,6 +136,7 @@ class ReplicaManager(val config: KafkaConfig,
                      quotaManager: ReplicationQuotaManager,
                      val brokerTopicStats: BrokerTopicStats,
                      val metadataCache: MetadataCache,
+                     logDirFailureChannel: LogDirFailureChannel,
                      val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
                      val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
                      val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
@@ -150,9 +152,10 @@ class ReplicaManager(val config: KafkaConfig,
            quotaManager: ReplicationQuotaManager,
            brokerTopicStats: BrokerTopicStats,
            metadataCache: MetadataCache,
+           logDirFailureChannel: LogDirFailureChannel,
            threadNamePrefix: Option[String] = None) {
     this(config, metrics, time, zkUtils, scheduler, logManager, isShuttingDown,
-      quotaManager, brokerTopicStats, metadataCache,
+      quotaManager, brokerTopicStats, metadataCache, logDirFailureChannel,
       DelayedOperationPurgatory[DelayedProduce](
         purgatoryName = "Produce", brokerId = config.brokerId,
         purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
@@ -173,13 +176,27 @@ class ReplicaManager(val config: KafkaConfig,
   private val replicaStateChangeLock = new Object
   val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManager)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
-  val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
+  @volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir =>
+    (dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
+
   private var hwThreadInitialized = false
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
   val stateChangeLogger = KafkaController.stateChangeLogger
   private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
   private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
   private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
+  private var logDirFailureHandler: LogDirFailureHandler = null
+
+  private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
+    override def doWork() {
+      val newOfflineLogDir = logDirFailureChannel.takeNextLogFailureEvent()
+      if (haltBrokerOnDirFailure) {
+        fatal(s"Halting broker because dir $newOfflineLogDir is offline")
+        Exit.halt(1)
+      }
+      handleLogDirFailure(newOfflineLogDir)
+    }
+  }
 
   val leaderCount = newGauge(
     "LeaderCount",
@@ -193,6 +210,12 @@ class ReplicaManager(val config: KafkaConfig,
       def value = allPartitions.size
     }
   )
+  val offlineReplicaCount = newGauge(
+    "OfflineReplicaCount",
+    new Gauge[Int] {
+      def value = allPartitions.values.count(_ eq ReplicaManager.OfflinePartition)
+    }
+  )
   val underReplicatedPartitions = newGauge(
     "UnderReplicatedPartitions",
     new Gauge[Int] {
@@ -277,20 +300,30 @@ class ReplicaManager(val config: KafkaConfig,
     // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
     scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
     scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)
+    val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_0_11_1_IV0
+    logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
+    logDirFailureHandler.start()
   }
 
   def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Errors  = {
     stateChangeLogger.trace(s"Broker $localBrokerId handling stop replica (delete=$deletePartition) for partition $topicPartition")
     val error = Errors.NONE
     getPartition(topicPartition) match {
-      case Some(_) =>
+      case Some(partition) =>
         if (deletePartition) {
+          if (partition eq ReplicaManager.OfflinePartition)
+            throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk")
           val removedPartition = allPartitions.remove(topicPartition)
           if (removedPartition != null) {
-            removedPartition.delete() // this will delete the local log
-            val topicHasPartitions = allPartitions.keys.exists(tp => topicPartition.topic == tp.topic)
+            val topicHasPartitions = allPartitions.values.exists(partition => topicPartition.topic == partition.topic)
             if (!topicHasPartitions)
               brokerTopicStats.removeMetrics(topicPartition.topic)
+            // this will delete the local log. This call may throw exception if the log is on offline directory
+            removedPartition.delete()
+          } else if (logManager.getLog(topicPartition).isDefined) {
+            // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
+            // This could happen when topic is being deleted while broker is down and recovers.
+            logManager.asyncDelete(topicPartition)
           }
         }
       case None =>
@@ -317,8 +350,14 @@ class ReplicaManager(val config: KafkaConfig,
         // First stop fetchers for all partitions, then stop the corresponding replicas
         replicaFetcherManager.removeFetcherForPartitions(partitions)
         for (topicPartition <- partitions){
-          val error = stopReplica(topicPartition, stopReplicaRequest.deletePartitions)
-          responseMap.put(topicPartition, error)
+          try {
+            val error = stopReplica(topicPartition, stopReplicaRequest.deletePartitions)
+            responseMap.put(topicPartition, error)
+          } catch {
+            case e: KafkaStorageException =>
+              stateChangeLogger.error(s"Broker $localBrokerId ignoring stop replica (delete=${stopReplicaRequest.deletePartitions}) for partition $topicPartition due to storage exception", e)
+              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
+          }
         }
         (responseMap, Errors.NONE)
       }
@@ -332,8 +371,15 @@ class ReplicaManager(val config: KafkaConfig,
     Option(allPartitions.get(topicPartition))
 
   def getReplicaOrException(topicPartition: TopicPartition): Replica = {
-    getReplica(topicPartition).getOrElse {
-      throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not available for partition $topicPartition")
+    getPartition(topicPartition) match {
+      case Some(partition) =>
+        if (partition eq ReplicaManager.OfflinePartition)
+          throw new KafkaStorageException(s"Replica $localBrokerId is in an offline log directory for partition $topicPartition")
+        else
+          partition.getReplica(localBrokerId).getOrElse(
+            throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not available for partition $topicPartition"))
+      case None =>
+        throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not available for partition $topicPartition")
     }
   }
 
@@ -343,7 +389,9 @@ class ReplicaManager(val config: KafkaConfig,
       case None =>
         throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist on $localBrokerId")
       case Some(partition) =>
-        partition.leaderReplicaIfLocal match {
+        if (partition eq ReplicaManager.OfflinePartition)
+          throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")
+        else partition.leaderReplicaIfLocal match {
           case Some(leaderReplica) => leaderReplica
           case None =>
             throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
@@ -352,10 +400,17 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def getReplica(topicPartition: TopicPartition, replicaId: Int): Option[Replica] =
-    getPartition(topicPartition).flatMap(_.getReplica(replicaId))
+    getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition).flatMap(_.getReplica(replicaId))
 
   def getReplica(tp: TopicPartition): Option[Replica] = getReplica(tp, localBrokerId)
 
+  def getLogDir(topicPartition: TopicPartition): Option[String] = {
+    getReplica(topicPartition).flatMap(_.log) match {
+      case Some(log) => Some(log.dir.getParent)
+      case None => None
+    }
+  }
+
   /**
    * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
    * the callback function will be triggered either when timeout or the required acks are satisfied;
@@ -422,8 +477,14 @@ class ReplicaManager(val config: KafkaConfig,
         (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
       } else {
         try {
-          val partition = getPartition(topicPartition).getOrElse(
-            throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId)))
+          val partition = getPartition(topicPartition) match {
+            case Some(p) =>
+              if (p eq ReplicaManager.OfflinePartition)
+                throw new KafkaStorageException("Partition %s is in an offline log directory on broker %d".format(topicPartition, localBrokerId))
+              p
+            case None =>
+              throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId))
+          }
           val convertedOffset =
             if (requestedOffset == DeleteRecordsRequest.HIGH_WATERMARK) {
               partition.leaderReplicaIfLocal match {
@@ -443,14 +504,11 @@ class ReplicaManager(val config: KafkaConfig,
         } catch {
           // NOTE: Failed produce requests metric is not incremented for known exceptions
           // it is supposed to indicate un-expected failures of a broker in handling a produce request
-          case e: KafkaStorageException =>
-            fatal("Halting due to unrecoverable I/O error while handling DeleteRecordsRequest: ", e)
-            Runtime.getRuntime.halt(1)
-            (topicPartition, null)
           case e@ (_: UnknownTopicOrPartitionException |
                    _: NotLeaderForPartitionException |
                    _: OffsetOutOfRangeException |
                    _: PolicyViolationException |
+                   _: KafkaStorageException |
                    _: NotEnoughReplicasException) =>
             (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(e)))
           case t: Throwable =>
@@ -543,6 +601,8 @@ class ReplicaManager(val config: KafkaConfig,
           val partitionOpt = getPartition(topicPartition)
           val info = partitionOpt match {
             case Some(partition) =>
+              if (partition eq ReplicaManager.OfflinePartition)
+                throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")
               partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
 
             case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
@@ -567,15 +627,12 @@ class ReplicaManager(val config: KafkaConfig,
         } catch {
           // NOTE: Failed produce requests metric is not incremented for known exceptions
           // it is supposed to indicate un-expected failures of a broker in handling a produce request
-          case e: KafkaStorageException =>
-            fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
-            Exit.halt(1)
-            (topicPartition, null)
           case e@ (_: UnknownTopicOrPartitionException |
                    _: NotLeaderForPartitionException |
                    _: RecordTooLargeException |
                    _: RecordBatchTooLargeException |
                    _: CorruptRecordException |
+                   _: KafkaStorageException |
                    _: InvalidTimestampException) =>
             (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
           case t: Throwable =>
@@ -747,6 +804,7 @@ class ReplicaManager(val config: KafkaConfig,
         case e@ (_: UnknownTopicOrPartitionException |
                  _: NotLeaderForPartitionException |
                  _: ReplicaNotAvailableException |
+                 _: KafkaStorageException |
                  _: OffsetOutOfRangeException) =>
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
                         highWatermark = -1L,
@@ -793,7 +851,7 @@ class ReplicaManager(val config: KafkaConfig,
    *  the quota is exceeded and the replica is not in sync.
    */
   def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicPartition, replicaId: Int): Boolean = {
-    val isReplicaInSync = getPartition(topicPartition).flatMap { partition =>
+    val isReplicaInSync = getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition).flatMap { partition =>
       partition.getReplica(replicaId).map(partition.inSyncReplicas.contains)
     }.getOrElse(false)
     quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
@@ -819,7 +877,8 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
+  def becomeLeaderOrFollower(correlationId: Int,
+                             leaderAndISRRequest: LeaderAndIsrRequest,
                              onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
     leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
       stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
@@ -842,9 +901,14 @@ class ReplicaManager(val config: KafkaConfig,
         leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
           val partition = getOrCreatePartition(topicPartition)
           val partitionLeaderEpoch = partition.getLeaderEpoch
-          // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-          // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-          if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
+          if (partition eq ReplicaManager.OfflinePartition) {
+            stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
+              "epoch %d for partition [%s,%d] as the local replica for the partition is in an offline log directory")
+              .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
+            responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
+          } else if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
+            // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
+            // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
             if(stateInfo.replicas.contains(localBrokerId))
               partitionState.put(partition, stateInfo)
             else {
@@ -878,6 +942,17 @@ class ReplicaManager(val config: KafkaConfig,
         else
           Set.empty[Partition]
 
+        leaderAndISRRequest.partitionStates.asScala.keys.foreach( topicPartition =>
+          /*
+           * If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
+           * before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
+           * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
+           * we need to map this topic-partition to OfflinePartition instead.
+           */
+          if (getReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition))
+            allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
+        )
+
         // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
         // have been completely populated before starting the checkpointing there by avoiding weird race conditions
         if (!hwThreadInitialized) {
@@ -885,7 +960,6 @@ class ReplicaManager(val config: KafkaConfig,
           hwThreadInitialized = true
         }
         replicaFetcherManager.shutdownIdleFetcherThreads()
-
         onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
         BecomeLeaderOrFollowerResult(responseMap, Errors.NONE)
       }
@@ -926,18 +1000,27 @@ class ReplicaManager(val config: KafkaConfig,
       replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
       // Update the partition information to be the leader
       partitionState.foreach{ case (partition, partitionStateInfo) =>
-        if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
-          partitionsToMakeLeaders += partition
-        else
-          stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +
-            "controller %d epoch %d for partition %s since it is already the leader for the partition.")
-            .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
-      }
-      partitionsToMakeLeaders.foreach { partition =>
-        stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
-          "%d epoch %d with correlation id %d for partition %s")
-          .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
+        try {
+          if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) {
+            partitionsToMakeLeaders += partition
+            stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
+              "%d epoch %d with correlation id %d for partition %s")
+              .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
+          } else
+            stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +
+              "controller %d epoch %d for partition %s since it is already the leader for the partition.")
+              .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
+        } catch {
+          case e: KafkaStorageException =>
+            stateChangeLogger.error(("Broker %d skipped the become-leader state change with correlation id %d from " +
+              "controller %d epoch %d for partition %s since the replica for the partition is offline due to disk error %s.")
+              .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition, e))
+            val dirOpt = getLogDir(new TopicPartition(partition.topic, partition.partitionId))
+            error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e)
+            responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.KAFKA_STORAGE_ERROR)
+        }
       }
+
     } catch {
       case e: Throwable =>
         partitionState.keys.foreach { partition =>
@@ -996,27 +1079,37 @@ class ReplicaManager(val config: KafkaConfig,
 
       // TODO: Delete leaders from LeaderAndIsrRequest
       partitionState.foreach{ case (partition, partitionStateInfo) =>
-        val newLeaderBrokerId = partitionStateInfo.leader
-        metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
-          // Only change partition state when the leader is available
-          case Some(_) =>
-            if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
-              partitionsToMakeFollower += partition
-            else
-              stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
-                "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader")
+        try {
+          val newLeaderBrokerId = partitionStateInfo.leader
+          metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
+            // Only change partition state when the leader is available
+            case Some(_) =>
+              if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
+                partitionsToMakeFollower += partition
+              else
+                stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
+                  "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader")
+                  .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
+                    partition.topicPartition, newLeaderBrokerId))
+            case None =>
+              // The leader broker should always be present in the metadata cache.
+              // If not, we should record the error message and abort the transition process for this partition
+              stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
+                " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.")
                 .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
-                partition.topicPartition, newLeaderBrokerId))
-          case None =>
-            // The leader broker should always be present in the metadata cache.
-            // If not, we should record the error message and abort the transition process for this partition
-            stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
-              " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.")
-              .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
-              partition.topicPartition, newLeaderBrokerId))
-            // Create the local replica even if the leader is unavailable. This is required to ensure that we include
-            // the partition's high watermark in the checkpoint file (see KAFKA-1647)
-            partition.getOrCreateReplica()
+                  partition.topicPartition, newLeaderBrokerId))
+              // Create the local replica even if the leader is unavailable. This is required to ensure that we include
+              // the partition's high watermark in the checkpoint file (see KAFKA-1647)
+              partition.getOrCreateReplica(isNew = partitionStateInfo.isNew)
+          }
+        } catch {
+          case e: KafkaStorageException =>
+            stateChangeLogger.error(("Broker %d skipped the become-follower state change with correlation id %d from " +
+              "controller %d epoch %d for partition [%s,%d] since the replica for the partition is offline due to disk error %s")
+              .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, partition.topic, partition.partitionId, e))
+            val dirOpt = getLogDir(new TopicPartition(partition.topic, partition.partitionId))
+            error(s"Error while making broker the follower for partition $partition in dir $dirOpt", e)
+            responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.KAFKA_STORAGE_ERROR)
         }
       }
 
@@ -1080,7 +1173,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   private def maybeShrinkIsr(): Unit = {
     trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
-    allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
+    allPartitions.values.filter(_ ne ReplicaManager.OfflinePartition).foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
   }
 
   private def updateFollowerLogReadResults(replicaId: Int, readResults: Seq[(TopicPartition, LogReadResult)]) {
@@ -1088,7 +1181,8 @@ class ReplicaManager(val config: KafkaConfig,
     readResults.foreach { case (topicPartition, readResult) =>
       getPartition(topicPartition) match {
         case Some(partition) =>
-          partition.updateReplicaLogReadResult(replicaId, readResult)
+          if (partition ne ReplicaManager.OfflinePartition)
+            partition.updateReplicaLogReadResult(replicaId, readResult)
 
           // for producer requests with ack > 1, we need to check
           // if they can be unblocked after some follower's log end offsets have moved
@@ -1100,33 +1194,86 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   private def getLeaderPartitions: List[Partition] =
-    allPartitions.values.filter(_.leaderReplicaIfLocal.isDefined).toList
+    allPartitions.values.filter(partition => (partition ne ReplicaManager.OfflinePartition) && partition.leaderReplicaIfLocal.isDefined).toList
 
   def getLogEndOffset(topicPartition: TopicPartition): Option[Long] = {
-    getPartition(topicPartition).flatMap{ partition =>
-      partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset)
+    getPartition(topicPartition) match {
+      case Some(partition) =>
+        if (partition eq ReplicaManager.OfflinePartition)
+          None
+        else
+          partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset)
+      case None => None
     }
   }
 
   // Flushes the highwatermark value for all partitions to the highwatermark file
   def checkpointHighWatermarks() {
-    val replicas = allPartitions.values.flatMap(_.getReplica(localBrokerId))
-    val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
+    val replicas = allPartitions.values.filter(_ ne ReplicaManager.OfflinePartition).flatMap(_.getReplica(localBrokerId))
+    val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
     for ((dir, reps) <- replicasByDir) {
       val hwms = reps.map(r => r.partition.topicPartition -> r.highWatermark.messageOffset).toMap
       try {
-        highWatermarkCheckpoints(dir).write(hwms)
+        highWatermarkCheckpoints.get(dir).foreach(_.write(hwms))
       } catch {
-        case e: IOException =>
-          fatal("Error writing to highwatermark file: ", e)
-          Exit.halt(1)
+        case e: KafkaStorageException =>
+          error(s"Error while writing to highwatermark file in directory $dir", e)
       }
     }
   }
 
+  def handleLogDirFailure(dir: String) {
+    if (!logManager.isLogDirOnline(dir))
+      return
+
+    info(s"Stopping serving replicas in dir $dir")
+    replicaStateChangeLock synchronized {
+      val newOfflinePartitions = allPartitions.values.filter { partition =>
+        if (partition eq ReplicaManager.OfflinePartition)
+          false
+        else partition.getReplica(config.brokerId) match {
+          case Some(replica) =>
+            replica.log.isDefined && replica.log.get.dir.getParent == dir
+          case None => false
+        }
+      }.map(_.topicPartition)
+
+      info(s"Partitions ${newOfflinePartitions.mkString(",")} are offline due to failure on log directory $dir")
+
+      newOfflinePartitions.foreach { topicPartition =>
+        val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
+        partition.removePartitionMetrics()
+      }
+
+      newOfflinePartitions.map(_.topic).toSet.foreach { topic: String =>
+        val topicHasPartitions = allPartitions.values.exists(partition => topic == partition.topic)
+        if (!topicHasPartitions)
+          brokerTopicStats.removeMetrics(topic)
+      }
+
+      replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions.toSet)
+      highWatermarkCheckpoints = highWatermarkCheckpoints.filterKeys(_ != dir)
+      info("Broker %d stopped fetcher for partitions %s because they are in the failed log dir %s"
+        .format(localBrokerId, newOfflinePartitions.mkString(", "), dir))
+    }
+    logManager.handleLogDirFailure(dir)
+    LogDirUtils.propagateLogDirEvent(zkUtils, localBrokerId)
+    info(s"Stopped serving replicas in dir $dir")
+  }
+
+  def removeMetrics() {
+    removeMetric("LeaderCount")
+    removeMetric("PartitionCount")
+    removeMetric("OfflineReplicaCount")
+    removeMetric("UnderReplicatedPartitions")
+  }
+
   // High watermark do not need to be checkpointed only when under unit tests
   def shutdown(checkpointHW: Boolean = true) {
     info("Shutting down")
+    removeMetrics()
+    if (logDirFailureHandler != null)
+      logDirFailureHandler.shutdown()
     replicaFetcherManager.shutdown()
     delayedFetchPurgatory.shutdown()
     delayedProducePurgatory.shutdown()
@@ -1144,7 +1291,10 @@ class ReplicaManager(val config: KafkaConfig,
     requestedEpochInfo.map { case (tp, leaderEpoch) =>
       val epochEndOffset = getPartition(tp) match {
         case Some(partition) =>
-          partition.lastOffsetForLeaderEpoch(leaderEpoch)
+          if (partition eq ReplicaManager.OfflinePartition)
+            new EpochEndOffset(KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH_OFFSET)
+          else
+            partition.lastOffsetForLeaderEpoch(leaderEpoch)
         case None =>
           new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
index cc50620..7b67559 100644
--- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
@@ -18,9 +18,13 @@ package kafka.server.checkpoints
 
 import java.io._
 import java.nio.charset.StandardCharsets
-import java.nio.file.{FileAlreadyExistsException, FileSystems, Files, Paths}
-import kafka.utils.{Exit, Logging}
+import java.nio.file.{FileAlreadyExistsException, Files, Paths}
+
+import kafka.server.LogDirFailureChannel
+import kafka.utils.Logging
+import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.utils.Utils
+
 import scala.collection.{Seq, mutable}
 
 trait CheckpointFileFormatter[T]{
@@ -29,86 +33,94 @@ trait CheckpointFileFormatter[T]{
   def fromLine(line: String): Option[T]
 }
 
-class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileFormatter[T]) extends Logging {
+class CheckpointFile[T](val file: File,
+                        version: Int,
+                        formatter: CheckpointFileFormatter[T],
+                        logDirFailureChannel: LogDirFailureChannel,
+                        logDir: String) extends Logging {
   private val path = file.toPath.toAbsolutePath
   private val tempPath = Paths.get(path.toString + ".tmp")
   private val lock = new Object()
-  
+
   try Files.createFile(file.toPath) // create the file if it doesn't exist
   catch { case _: FileAlreadyExistsException => }
 
   def write(entries: Seq[T]) {
     lock synchronized {
-      // write to temp file and then swap with the existing file
-      val fileOutputStream = new FileOutputStream(tempPath.toFile)
-      val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
       try {
-        writer.write(version.toString)
-        writer.newLine()
-
-        writer.write(entries.size.toString)
-        writer.newLine()
+        // write to temp file and then swap with the existing file
+        val fileOutputStream = new FileOutputStream(tempPath.toFile)
+        val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
+        try {
+          writer.write(version.toString)
+          writer.newLine()
 
-        entries.foreach { entry =>
-          writer.write(formatter.toLine(entry))
+          writer.write(entries.size.toString)
           writer.newLine()
+
+          entries.foreach { entry =>
+            writer.write(formatter.toLine(entry))
+            writer.newLine()
+          }
+
+          writer.flush()
+          fileOutputStream.getFD().sync()
+        } finally {
+          writer.close()
         }
 
-        writer.flush()
-        fileOutputStream.getFD().sync()
+        Utils.atomicMoveWithFallback(tempPath, path)
       } catch {
-        case e: FileNotFoundException =>
-          if (FileSystems.getDefault.isReadOnly) {
-            fatal(s"Halting writes to checkpoint file (${file.getAbsolutePath}) because the underlying file system is inaccessible: ", e)
-            Exit.halt(1)
-          }
-          throw e
-      } finally {
-        writer.close()
+        case e: IOException =>
+          logDirFailureChannel.maybeAddLogFailureEvent(logDir)
+          throw new KafkaStorageException(s"Error while writing to checkpoint file ${file.getAbsolutePath}", e)
       }
-
-      Utils.atomicMoveWithFallback(tempPath, path)
     }
   }
 
   def read(): Seq[T] = {
     def malformedLineException(line: String) =
       new IOException(s"Malformed line in checkpoint file (${file.getAbsolutePath}): $line'")
-
     lock synchronized {
-      val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
-      var line: String = null
       try {
-        line = reader.readLine()
-        if (line == null)
-          return Seq.empty
-        line.toInt match {
-          case fileVersion if fileVersion == version =>
-            line = reader.readLine()
-            if (line == null)
-              return Seq.empty
-            val expectedSize = line.toInt
-            val entries = mutable.Buffer[T]()
-            line = reader.readLine()
-            while (line != null) {
-              val entry = formatter.fromLine(line)
-              entry match {
-                case Some(e) =>
-                  entries += e
-                  line = reader.readLine()
-                case _ => throw malformedLineException(line)
+        val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
+        var line: String = null
+        try {
+          line = reader.readLine()
+          if (line == null)
+            return Seq.empty
+          line.toInt match {
+            case fileVersion if fileVersion == version =>
+              line = reader.readLine()
+              if (line == null)
+                return Seq.empty
+              val expectedSize = line.toInt
+              val entries = mutable.Buffer[T]()
+              line = reader.readLine()
+              while (line != null) {
+                val entry = formatter.fromLine(line)
+                entry match {
+                  case Some(e) =>
+                    entries += e
+                    line = reader.readLine()
+                  case _ => throw malformedLineException(line)
+                }
               }
-            }
-            if (entries.size != expectedSize)
-              throw new IOException(s"Expected $expectedSize entries in checkpoint file (${file.getAbsolutePath}), but found only ${entries.size}")
-            entries
-          case _ =>
-            throw new IOException(s"Unrecognized version of the checkpoint file (${file.getAbsolutePath}): " + version)
+              if (entries.size != expectedSize)
+                throw new IOException(s"Expected $expectedSize entries in checkpoint file (${file.getAbsolutePath}), but found only ${entries.size}")
+              entries
+            case _ =>
+              throw new IOException(s"Unrecognized version of the checkpoint file (${file.getAbsolutePath}): " + version)
+          }
+        } catch {
+          case _: NumberFormatException => throw malformedLineException(line)
+        } finally {
+          reader.close()
         }
       } catch {
-        case _: NumberFormatException => throw malformedLineException(line)
-      } finally {
-        reader.close()
+        case e: IOException =>
+          logDirFailureChannel.maybeAddLogFailureEvent(logDir)
+          throw new KafkaStorageException(s"Error while reading checkpoint file ${file.getAbsolutePath}", e)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
index d32d30f..a8db688 100644
--- a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
@@ -19,6 +19,7 @@ package kafka.server.checkpoints
 import java.io._
 import java.util.regex.Pattern
 
+import kafka.server.LogDirFailureChannel
 import kafka.server.epoch.EpochEntry
 
 import scala.collection._
@@ -55,10 +56,10 @@ object LeaderEpochCheckpointFile {
 /**
   * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
   */
-class LeaderEpochCheckpointFile(val file: File) extends LeaderEpochCheckpoint {
+class LeaderEpochCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) extends LeaderEpochCheckpoint {
   import LeaderEpochCheckpointFile._
 
-  val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter)
+  val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter, logDirFailureChannel, file.getParentFile.getParent)
 
   def write(epochs: Seq[EpochEntry]): Unit = checkpoint.write(epochs)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 5f5dc97..9cd0963 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -19,6 +19,7 @@ package kafka.server.checkpoints
 import java.io._
 import java.util.regex.Pattern
 
+import kafka.server.LogDirFailureChannel
 import kafka.server.epoch.EpochEntry
 import org.apache.kafka.common.TopicPartition
 
@@ -51,9 +52,9 @@ trait OffsetCheckpoint {
 /**
   * This class persists a map of (Partition => Offsets) to a file (for a certain replica)
   */
-class OffsetCheckpointFile(val f: File) {
+class OffsetCheckpointFile(val f: File, logDirFailureChannel: LogDirFailureChannel = null) {
   val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointFile.CurrentVersion,
-    OffsetCheckpointFile.Formatter)
+    OffsetCheckpointFile.Formatter, logDirFailureChannel, f.getParent)
 
   def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets.toSeq)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/utils/LogDirUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/LogDirUtils.scala b/core/src/main/scala/kafka/utils/LogDirUtils.scala
new file mode 100644
index 0000000..0bbc47d
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/LogDirUtils.scala
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.controller.LogDirEventNotificationListener
+import scala.collection.Map
+
+object LogDirUtils extends Logging {
+
+  private val LogDirEventNotificationPrefix = "log_dir_event_"
+  val LogDirFailureEvent = 1
+
+  def propagateLogDirEvent(zkUtils: ZkUtils, brokerId: Int) {
+    val logDirEventNotificationPath: String = zkUtils.createSequentialPersistentPath(
+      ZkUtils.LogDirEventNotificationPath + "/" + LogDirEventNotificationPrefix, logDirFailureEventZkData(brokerId))
+    debug("Added " + logDirEventNotificationPath + " for broker " + brokerId)
+  }
+
+  private def logDirFailureEventZkData(brokerId: Int): String = {
+    Json.encode(Map("version" -> LogDirEventNotificationListener.version, "broker" -> brokerId, "event" -> LogDirFailureEvent))
+  }
+
+  def deleteLogDirEvents(zkUtils: ZkUtils) {
+    val sequenceNumbers = zkUtils.getChildrenParentMayNotExist(ZkUtils.LogDirEventNotificationPath).toSet
+    sequenceNumbers.map(x => zkUtils.deletePath(ZkUtils.LogDirEventNotificationPath + "/" + x))
+  }
+
+  def getBrokerIdFromLogDirEvent(zkUtils: ZkUtils, child: String): Option[Int] = {
+    val changeZnode = ZkUtils.LogDirEventNotificationPath + "/" + child
+    val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode)
+    if (jsonOpt.isDefined) {
+      val json = Json.parseFull(jsonOpt.get)
+
+      json match {
+        case Some(m) =>
+          val brokerAndEventType = m.asInstanceOf[Map[String, Any]]
+          val brokerId = brokerAndEventType.get("broker").get.asInstanceOf[Int]
+          val eventType = brokerAndEventType.get("event").get.asInstanceOf[Int]
+          if (eventType != LogDirFailureEvent)
+            throw new IllegalArgumentException(s"The event type $eventType in znode $changeZnode is not recognized")
+          Some(brokerId)
+        case None =>
+          error("Invalid LogDirEvent JSON: " + jsonOpt.get + " in ZK: " + changeZnode)
+          None
+      }
+    } else {
+      None
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 0035120..7d3529f 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -51,6 +51,7 @@ object ZkUtils {
   val ControllerPath = "/controller"
   val ControllerEpochPath = "/controller_epoch"
   val IsrChangeNotificationPath = "/isr_change_notification"
+  val LogDirEventNotificationPath = "/log_dir_event_notification"
   val KafkaAclPath = "/kafka-acl"
   val KafkaAclChangesPath = "/kafka-acl-changes"
 
@@ -75,7 +76,8 @@ object ZkUtils {
                               IsrChangeNotificationPath,
                               KafkaAclPath,
                               KafkaAclChangesPath,
-                              ProducerIdBlockPath)
+                              ProducerIdBlockPath,
+                              LogDirEventNotificationPath)
 
   // Important: it is necessary to add any new top level Zookeeper path that contains
   //            sensitive information that should not be world readable to the Seq
@@ -235,7 +237,8 @@ class ZkUtils(val zkClient: ZkClient,
                               DeleteTopicsPath,
                               BrokerSequenceIdPath,
                               IsrChangeNotificationPath,
-                              ProducerIdBlockPath)
+                              ProducerIdBlockPath,
+                              LogDirEventNotificationPath)
 
   // Visible for testing
   val zkPath = new ZkPath(zkClient)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 09ff9be..2b134fe 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -44,7 +44,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{Node, TopicPartition, requests}
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
@@ -272,7 +271,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createUpdateMetadataRequest = {
-    val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava
+    val partitionState = Map(tp -> new UpdateMetadataRequest.PartitionState(
+      Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, Seq.empty[Integer].asJava)).asJava
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
       Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol,
@@ -303,8 +303,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, "").build()
 
   private def leaderAndIsrRequest = {
-    new requests.LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue,
-      Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava,
+    new requests.LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue,
+      Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, false)).asJava,
       Set(new Node(brokerId, "localhost", 0)).asJava).build()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 921c2b4..5e3c7ab 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -37,6 +37,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
   val producerCount: Int
   val consumerCount: Int
   val serverCount: Int
+  var logDirCount: Int = 1
   lazy val producerConfig = new Properties
   lazy val consumerConfig = new Properties
   lazy val serverConfig = new Properties
@@ -46,7 +47,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
 
   override def generateConfigs = {
     val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
-      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
+      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
     cfgs.foreach { config =>
       config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
       config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
@@ -84,7 +85,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
                                   saslProperties = this.clientSaslProperties,
                                   props = Some(producerConfig))
   }
-  
+
   def createNewConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
       TestUtils.createNewConsumer(brokerList,
                                   securityProtocol = this.securityProtocol,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
new file mode 100644
index 0000000..6942df0
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util.Collections
+import java.util.concurrent.{ExecutionException, TimeUnit}
+
+import kafka.controller.{OfflineReplica, PartitionAndReplica}
+import kafka.server.KafkaConfig
+import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException}
+import org.junit.{Before, Test}
+import org.junit.Assert.assertTrue
+import org.junit.Assert.assertEquals
+
+/**
+  * Test whether clients can producer and consume when there is log directory failure
+  */
+class LogDirFailureTest extends IntegrationTestHarness {
+  val producerCount: Int = 1
+  val consumerCount: Int = 1
+  val serverCount: Int = 2
+  private val topic = "topic"
+
+  this.logDirCount = 2
+  this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
+  this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
+  this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp, "100")
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    TestUtils.createTopic(zkUtils, topic, 1, 2, servers = servers)
+  }
+
+  @Test
+  def testProduceAfterLogDirFailure() {
+
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+    val producer = producers.head
+    val partition = new TopicPartition(topic, 0)
+    val record = new ProducerRecord(topic, 0, s"key".getBytes, s"value".getBytes)
+
+    val leaderServerId = producer.partitionsFor(topic).get(0).leader().id()
+    val leaderServer = servers.find(_.config.brokerId == leaderServerId).get
+
+    // The first send() should succeed
+    producer.send(record).get()
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(0).count() == 1
+    }, "Expected the first message", 3000L)
+
+    // Make log directory of the partition on the leader broker inaccessible by replacing it with a file
+    val replica = leaderServer.replicaManager.getReplica(partition)
+    val logDir = replica.get.log.get.dir.getParentFile
+    CoreUtils.swallow(Utils.delete(logDir))
+    logDir.createNewFile()
+    assertTrue(logDir.isFile)
+
+    // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the topic will be offline
+    TestUtils.waitUntilTrue(() => !leaderServer.logManager.liveLogDirs.contains(logDir), "Expected log directory offline", 3000L)
+    assertTrue(leaderServer.replicaManager.getReplica(partition).isEmpty)
+
+    // The second send() should fail due to either KafkaStorageException or NotLeaderForPartitionException
+    try {
+      producer.send(record).get(6000, TimeUnit.MILLISECONDS)
+      fail("send() should fail with either KafkaStorageException or NotLeaderForPartitionException")
+    } catch {
+      case e: ExecutionException =>
+        e.getCause match {
+          case t: KafkaStorageException =>
+          case t: NotLeaderForPartitionException => // This may happen if ProduceRequest version <= 3
+          case t: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${t.toString}")
+        }
+      case e: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${e.toString}")
+    }
+
+    // Wait for producer to update metadata for the partition
+    TestUtils.waitUntilTrue(() => {
+      // ProduceResponse may contain KafkaStorageException and trigger metadata update
+      producer.send(record)
+      producer.partitionsFor(topic).get(0).leader().id() != leaderServerId
+    }, "Expected new leader for the partition", 6000L)
+
+    // Consumer should receive some messages
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(0).count() > 0
+    }, "Expected some messages", 3000L)
+
+    // There should be no remaining LogDirEventNotification znode
+    assertTrue(zkUtils.getChildrenParentMayNotExist(ZkUtils.LogDirEventNotificationPath).isEmpty)
+
+    // The controller should have marked the replica on the original leader as offline
+    val controllerServer = servers.find(_.kafkaController.isActive).get
+    val offlineReplicas = controllerServer.kafkaController.replicaStateMachine.replicasInState(topic, OfflineReplica)
+    assertTrue(offlineReplicas.contains(PartitionAndReplica(topic, 0, leaderServerId)))
+  }
+
+  private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
+    consumer.subscribe(Collections.singletonList(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(0)
+      !consumer.assignment.isEmpty
+    }, "Expected non-empty assignment")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 0e57e53..760cc39 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -389,7 +389,7 @@ class TransactionsTest extends KafkaServerTestHarness {
       val recordMetadata = result.get()
       error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
       servers.foreach { server =>
-        error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
+        error(s"log dirs: ${server.logManager.liveLogDirs.map(_.getAbsolutePath).head}")
       }
       fail("Should not be able to send messages from a fenced producer.")
     } catch {
@@ -436,7 +436,7 @@ class TransactionsTest extends KafkaServerTestHarness {
       val recordMetadata = result.get()
       error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
       servers.foreach { case (server) =>
-        error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
+        error(s"log dirs: ${server.logManager.liveLogDirs.map(_.getAbsolutePath).head}")
       }
       fail("Should not be able to send messages from a fenced producer.")
     } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 147e84a..ebe7223 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -111,7 +111,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
 
       override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
         new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown,
-          quotaManagers.follower, new BrokerTopicStats, metadataCache) {
+          quotaManagers.follower, new BrokerTopicStats, metadataCache, logDirFailureChannel) {
 
           override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
                                                              quotaManager: ReplicationQuotaManager) =

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index a8ce17e..24e2920 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -97,7 +97,7 @@ class GroupMetadataManagerTest {
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
     EasyMock.replay(replicaManager)
-    
+
     groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
@@ -685,9 +685,9 @@ class GroupMetadataManagerTest {
     assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
     assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
     assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR)
-    assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN)
-    assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN)
-    assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN)
+    assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN_SERVER_ERROR)
+    assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN_SERVER_ERROR)
+    assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN_SERVER_ERROR)
     assertStoreGroupErrorMapping(Errors.CORRUPT_MESSAGE, Errors.CORRUPT_MESSAGE)
   }
 
@@ -1311,7 +1311,7 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     capturedArgument
   }
-  
+
   private def expectAppendMessage(error: Errors) {
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index df2f7df..6323d15 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -144,7 +144,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
 
   @Test
   def shouldThrowIllegalStateExceptionWhenUnknownError(): Unit = {
-    verifyThrowIllegalStateExceptionOnError(Errors.UNKNOWN)
+    verifyThrowIllegalStateExceptionOnError(Errors.UNKNOWN_SERVER_ERROR)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 6a35d41..e86e088 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -309,7 +309,7 @@ class TransactionStateManagerTest {
     transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]())
     transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1)
 
-    expectedError = Errors.UNKNOWN
+    expectedError = Errors.UNKNOWN_SERVER_ERROR
     var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
 
     prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index bf36199..d6f0a56 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -20,7 +20,7 @@ import java.io.File
 import java.nio.file.Files
 import java.util.Properties
 
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils.{MockTime, Pool, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
@@ -110,6 +110,7 @@ abstract class AbstractLogCleanerIntegrationTest {
     new LogCleaner(cleanerConfig,
       logDirs = Array(logDir),
       logs = logMap,
+      logDirFailureChannel = new LogDirFailureChannel(1),
       time = time)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 8a119c2..e569b29 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index b4c1790..f4eabc0 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -218,7 +218,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
   private def createCleanerManager(log: Log): LogCleanerManager = {
     val logs = new Pool[TopicPartition, Log]()
     logs.put(new TopicPartition("log", 0), log)
-    val cleanerManager = new LogCleanerManager(Array(logDir), logs)
+    val cleanerManager = new LogCleanerManager(Array(logDir), logs, null)
     cleanerManager
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 3e58c4d..689a032 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -881,7 +881,7 @@ class LogCleanerTest extends JUnitSuite {
     checkSegmentOrder(groups)
   }
 
-  /** 
+  /**
    * Following the loading of a log segment where the index file is zero sized,
    * the index returned would be the base offset.  Sometimes the log file would
    * contain data with offsets in excess of the baseOffset which would cause
@@ -1324,7 +1324,7 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
     lastOffset = offset
     map.put(keyFor(key), offset)
   }
-  
+
   override def get(key: ByteBuffer): Long = {
     val k = keyFor(key)
     if(map.containsKey(k))
@@ -1332,9 +1332,9 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
     else
       -1L
   }
-  
+
   override def clear(): Unit = map.clear()
-  
+
   override def size: Int = map.size
 
   override def latestOffset: Long = lastOffset

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 8b7819f..0826747 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -21,12 +21,10 @@ import java.io._
 import java.util.Properties
 
 import kafka.common._
-import kafka.server.FetchDataInfo
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
-import org.apache.kafka.common.requests.IsolationLevel
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -52,7 +50,7 @@ class LogManagerTest {
     logDir = TestUtils.tempDir()
     logManager = createLogManager()
     logManager.startup()
-    logDir = logManager.logDirs(0)
+    logDir = logManager.liveLogDirs(0)
   }
 
   @After
@@ -60,7 +58,7 @@ class LogManagerTest {
     if(logManager != null)
       logManager.shutdown()
     Utils.delete(logDir)
-    logManager.logDirs.foreach(Utils.delete)
+    logManager.liveLogDirs.foreach(Utils.delete)
   }
 
   /**
@@ -68,7 +66,7 @@ class LogManagerTest {
    */
   @Test
   def testCreateLog() {
-    val log = logManager.createLog(new TopicPartition(name, 0), logConfig)
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig)
     val logFile = new File(logDir, name + "-0")
     assertTrue(logFile.exists)
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
@@ -90,7 +88,7 @@ class LogManagerTest {
    */
   @Test
   def testCleanupExpiredSegments() {
-    val log = logManager.createLog(new TopicPartition(name, 0), logConfig)
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig)
     var offset = 0L
     for(_ <- 0 until 200) {
       val set = TestUtils.singletonRecords("test".getBytes())
@@ -135,7 +133,7 @@ class LogManagerTest {
     logManager.startup()
 
     // create a log
-    val log = logManager.createLog(new TopicPartition(name, 0), config)
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), config)
     var offset = 0L
 
     // add a bunch of messages that should be larger than the retentionSize
@@ -175,7 +173,7 @@ class LogManagerTest {
   def testDoesntCleanLogsWithCompactDeletePolicy() {
     val logProps = new Properties()
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete)
-    val log = logManager.createLog(new TopicPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
     var offset = 0L
     for (_ <- 0 until 200) {
       val set = TestUtils.singletonRecords("test".getBytes(), key="test".getBytes())
@@ -204,7 +202,7 @@ class LogManagerTest {
 
     logManager = createLogManager()
     logManager.startup()
-    val log = logManager.createLog(new TopicPartition(name, 0), config)
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), config)
     val lastFlush = log.lastFlushTime
     for (_ <- 0 until 200) {
       val set = TestUtils.singletonRecords("test".getBytes())
@@ -228,7 +226,7 @@ class LogManagerTest {
 
     // verify that logs are always assigned to the least loaded partition
     for(partition <- 0 until 20) {
-      logManager.createLog(new TopicPartition("test", partition), logConfig)
+      logManager.getOrCreateLog(new TopicPartition("test", partition), logConfig)
       assertEquals("We should have created the right number of logs", partition + 1, logManager.allLogs.size)
       val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size)
       assertTrue("Load should balance evenly", counts.max <= counts.min + 1)
@@ -286,7 +284,7 @@ class LogManagerTest {
 
   private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition],
                                        logManager: LogManager) {
-    val logs = topicPartitions.map(this.logManager.createLog(_, logConfig))
+    val logs = topicPartitions.map(this.logManager.getOrCreateLog(_, logConfig))
     logs.foreach(log => {
       for (_ <- 0 until 50)
         log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
@@ -294,7 +292,7 @@ class LogManagerTest {
       log.flush()
     })
 
-    logManager.checkpointRecoveryPointOffsets()
+    logManager.checkpointLogRecoveryOffsets()
     val checkpoints = new OffsetCheckpointFile(new File(logDir, logManager.RecoveryPointCheckpointFile)).read()
 
     topicPartitions.zip(logs).foreach {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 79fe220..30ccc8b 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -30,7 +30,7 @@ import scala.collection.JavaConverters._
 import scala.collection._
 
 class LogSegmentTest {
-  
+
   val topicPartition = new TopicPartition("topic", 0)
   val segments = mutable.ArrayBuffer[LogSegment]()
   var logDir: File = _
@@ -52,7 +52,7 @@ class LogSegmentTest {
     segments += seg
     seg
   }
-  
+
   /* create a ByteBufferMessageSet for the given messages starting from the given offset */
   def records(offset: Long, records: String*): MemoryRecords = {
     MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, offset, CompressionType.NONE, TimestampType.CREATE_TIME,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 008cd27..2213d09 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -254,7 +254,8 @@ class LogTest {
       maxProducerIdExpirationMs = 300000,
       producerIdExpirationCheckIntervalMs = 30000,
       topicPartition = Log.parseTopicPartitionName(logDir),
-      stateManager)
+      producerStateManager = stateManager,
+      logDirFailureChannel = null)
 
     EasyMock.verify(stateManager)
 
@@ -322,7 +323,8 @@ class LogTest {
       maxProducerIdExpirationMs = 300000,
       producerIdExpirationCheckIntervalMs = 30000,
       topicPartition = Log.parseTopicPartitionName(logDir),
-      stateManager)
+      producerStateManager = stateManager,
+      logDirFailureChannel = null)
 
     EasyMock.verify(stateManager)
   }
@@ -356,7 +358,8 @@ class LogTest {
       maxProducerIdExpirationMs = 300000,
       producerIdExpirationCheckIntervalMs = 30000,
       topicPartition = Log.parseTopicPartitionName(logDir),
-      stateManager)
+      producerStateManager = stateManager,
+      logDirFailureChannel = null)
 
     EasyMock.verify(stateManager)
     cleanShutdownFile.delete()
@@ -391,7 +394,8 @@ class LogTest {
       maxProducerIdExpirationMs = 300000,
       producerIdExpirationCheckIntervalMs = 30000,
       topicPartition = Log.parseTopicPartitionName(logDir),
-      stateManager)
+      producerStateManager = stateManager,
+      logDirFailureChannel = null)
 
     EasyMock.verify(stateManager)
     cleanShutdownFile.delete()

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index b6b40c2..5e63500 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.Utils
 import org.easymock.EasyMock
 import org.junit._
 import org.junit.Assert._
-import kafka.common._
 import kafka.cluster.Replica
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
 import java.util.concurrent.atomic.AtomicBoolean
@@ -35,24 +34,28 @@ class HighwatermarkPersistenceTest {
 
   val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
   val topic = "foo"
+  val zkUtils = EasyMock.createMock(classOf[ZkUtils])
   val logManagers = configs map { config =>
     TestUtils.createLogManager(
       logDirs = config.logDirs.map(new File(_)).toArray,
       cleanerConfig = CleanerConfig())
   }
-    
+
+  val logDirFailureChannels = configs map { config =>
+    new LogDirFailureChannel(config.logDirs.size)
+  }
+
   @After
   def teardown() {
-    for(manager <- logManagers; dir <- manager.logDirs)
+    for(manager <- logManagers; dir <- manager.liveLogDirs)
       Utils.delete(dir)
   }
 
   @Test
   def testHighWatermarkPersistenceSinglePartition() {
     // mock zkclient
-    val zkUtils = EasyMock.createMock(classOf[ZkUtils])
     EasyMock.replay(zkUtils)
-    
+
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
@@ -61,7 +64,7 @@ class HighwatermarkPersistenceTest {
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
       logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
-      new BrokerTopicStats, new MetadataCache(configs.head.brokerId))
+      new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
@@ -69,7 +72,7 @@ class HighwatermarkPersistenceTest {
       assertEquals(0L, fooPartition0Hw)
       val partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
       // create leader and follower replicas
-      val log0 = logManagers.head.createLog(new TopicPartition(topic, 0), LogConfig())
+      val log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic, 0), LogConfig())
       val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, time, 0, Some(log0))
       partition0.addReplicaIfNotExists(leaderReplicaPartition0)
       val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, time)
@@ -96,7 +99,6 @@ class HighwatermarkPersistenceTest {
     val topic1 = "foo1"
     val topic2 = "foo2"
     // mock zkclient
-    val zkUtils = EasyMock.createMock(classOf[ZkUtils])
     EasyMock.replay(zkUtils)
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
@@ -106,7 +108,7 @@ class HighwatermarkPersistenceTest {
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
       scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
-      new BrokerTopicStats, new MetadataCache(configs.head.brokerId))
+      new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
@@ -114,7 +116,7 @@ class HighwatermarkPersistenceTest {
       assertEquals(0L, topic1Partition0Hw)
       val topic1Partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic1, 0))
       // create leader log
-      val topic1Log0 = logManagers.head.createLog(new TopicPartition(topic1, 0), LogConfig())
+      val topic1Log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic1, 0), LogConfig())
       // create a local replica for topic1
       val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, time, 0, Some(topic1Log0))
       topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
@@ -130,7 +132,7 @@ class HighwatermarkPersistenceTest {
       // add another partition and set highwatermark
       val topic2Partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic2, 0))
       // create leader log
-      val topic2Log0 = logManagers.head.createLog(new TopicPartition(topic2, 0), LogConfig())
+      val topic2Log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic2, 0), LogConfig())
       // create a local replica for topic2
       val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, topic2Partition0, time, 0, Some(topic2Log0))
       topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
@@ -163,5 +165,5 @@ class HighwatermarkPersistenceTest {
     replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read.getOrElse(
       new TopicPartition(topic, partition), 0L)
   }
-  
+
 }