You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/07/22 19:36:03 UTC
[2/5] kafka git commit: KAFKA-4763;
Handle disk failure for JBOD (KIP-112)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index d7420dd..e856ca1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -18,16 +18,13 @@
package kafka.server
import java.util
-
import kafka.admin.AdminUtils
import kafka.api.{FetchRequest => _, _}
import kafka.cluster.{BrokerEndPoint, Replica}
-import kafka.common.KafkaStorageException
import kafka.log.LogConfig
import kafka.server.ReplicaFetcherThread._
import kafka.server.epoch.LeaderEpochCache
import org.apache.kafka.common.requests.EpochEndOffset._
-import kafka.utils.Exit
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.metrics.Metrics
@@ -35,7 +32,6 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, ListOffsetRequest, ListOffsetResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse, FetchRequest => JFetchRequest}
import org.apache.kafka.common.utils.Time
-
import scala.collection.JavaConverters._
import scala.collection.{Map, mutable}
@@ -83,41 +79,35 @@ class ReplicaFetcherThread(name: String,
// process fetched data
def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
- try {
- val replica = replicaMgr.getReplica(topicPartition).get
- val records = partitionData.toRecords
-
- maybeWarnIfOversizedRecords(records, topicPartition)
-
- if (fetchOffset != replica.logEndOffset.messageOffset)
- throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
- if (logger.isTraceEnabled)
- trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
- .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
-
- // Append the leader's messages to the log
- replica.log.get.appendAsFollower(records)
-
- if (logger.isTraceEnabled)
- trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
- .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
- val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
- val leaderLogStartOffset = partitionData.logStartOffset
- // for the follower replica, we do not need to keep
- // its segment base offset the physical position,
- // these values will be computed upon making the leader
- replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
- replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
- if (logger.isTraceEnabled)
- trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
- if (quota.isThrottled(topicPartition))
- quota.record(records.sizeInBytes)
- replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
- } catch {
- case e: KafkaStorageException =>
- fatal(s"Disk error while replicating data for $topicPartition", e)
- Exit.halt(1)
- }
+ val replica = replicaMgr.getReplica(topicPartition).get
+ val records = partitionData.toRecords
+
+ maybeWarnIfOversizedRecords(records, topicPartition)
+
+ if (fetchOffset != replica.logEndOffset.messageOffset)
+ throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
+ if (logger.isTraceEnabled)
+ trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
+ .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
+
+ // Append the leader's messages to the log
+ replica.log.get.appendAsFollower(records)
+
+ if (logger.isTraceEnabled)
+ trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
+ .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
+ val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
+ val leaderLogStartOffset = partitionData.logStartOffset
+ // for the follower replica, we do not need to keep
+ // its segment base offset the physical position,
+ // these values will be computed upon making the leader
+ replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
+ replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
+ if (logger.isTraceEnabled)
+ trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
+ if (quota.isThrottled(topicPartition))
+ quota.record(records.sizeInBytes)
+ replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
}
def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 853b7c4..40887be 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -16,14 +16,13 @@
*/
package kafka.server
-import java.io.{File, IOException}
+import java.io.File
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.cluster.{Partition, Replica}
-import kafka.common.KafkaStorageException
import kafka.controller.KafkaController
import kafka.log.{Log, LogAppendInfo, LogManager}
import kafka.metrics.KafkaMetricsGroup
@@ -31,11 +30,12 @@ import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _}
+import org.apache.kafka.common.errors.{KafkaStorageException, ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION
+import org.apache.kafka.common.protocol.Errors.KAFKA_STORAGE_ERROR
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchRequest.PartitionData
@@ -123,6 +123,7 @@ object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
val IsrChangePropagationBlackOut = 5000L
val IsrChangePropagationInterval = 60000L
+ val OfflinePartition = new Partition("", -1, null, null, isOffline = true)
}
class ReplicaManager(val config: KafkaConfig,
@@ -135,6 +136,7 @@ class ReplicaManager(val config: KafkaConfig,
quotaManager: ReplicationQuotaManager,
val brokerTopicStats: BrokerTopicStats,
val metadataCache: MetadataCache,
+ logDirFailureChannel: LogDirFailureChannel,
val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
@@ -150,9 +152,10 @@ class ReplicaManager(val config: KafkaConfig,
quotaManager: ReplicationQuotaManager,
brokerTopicStats: BrokerTopicStats,
metadataCache: MetadataCache,
+ logDirFailureChannel: LogDirFailureChannel,
threadNamePrefix: Option[String] = None) {
this(config, metrics, time, zkUtils, scheduler, logManager, isShuttingDown,
- quotaManager, brokerTopicStats, metadataCache,
+ quotaManager, brokerTopicStats, metadataCache, logDirFailureChannel,
DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", brokerId = config.brokerId,
purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
@@ -173,13 +176,27 @@ class ReplicaManager(val config: KafkaConfig,
private val replicaStateChangeLock = new Object
val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManager)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
- val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
+ @volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir =>
+ (dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
+
private var hwThreadInitialized = false
this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
val stateChangeLogger = KafkaController.stateChangeLogger
private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
+ private var logDirFailureHandler: LogDirFailureHandler = null
+
+ private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
+ override def doWork() {
+ val newOfflineLogDir = logDirFailureChannel.takeNextLogFailureEvent()
+ if (haltBrokerOnDirFailure) {
+ fatal(s"Halting broker because dir $newOfflineLogDir is offline")
+ Exit.halt(1)
+ }
+ handleLogDirFailure(newOfflineLogDir)
+ }
+ }
val leaderCount = newGauge(
"LeaderCount",
@@ -193,6 +210,12 @@ class ReplicaManager(val config: KafkaConfig,
def value = allPartitions.size
}
)
+ val offlineReplicaCount = newGauge(
+ "OfflineReplicaCount",
+ new Gauge[Int] {
+ def value = allPartitions.values.count(_ eq ReplicaManager.OfflinePartition)
+ }
+ )
val underReplicatedPartitions = newGauge(
"UnderReplicatedPartitions",
new Gauge[Int] {
@@ -277,20 +300,30 @@ class ReplicaManager(val config: KafkaConfig,
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)
+ val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_0_11_1_IV0
+ logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
+ logDirFailureHandler.start()
}
def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Errors = {
stateChangeLogger.trace(s"Broker $localBrokerId handling stop replica (delete=$deletePartition) for partition $topicPartition")
val error = Errors.NONE
getPartition(topicPartition) match {
- case Some(_) =>
+ case Some(partition) =>
if (deletePartition) {
+ if (partition eq ReplicaManager.OfflinePartition)
+ throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk")
val removedPartition = allPartitions.remove(topicPartition)
if (removedPartition != null) {
- removedPartition.delete() // this will delete the local log
- val topicHasPartitions = allPartitions.keys.exists(tp => topicPartition.topic == tp.topic)
+ val topicHasPartitions = allPartitions.values.exists(partition => topicPartition.topic == partition.topic)
if (!topicHasPartitions)
brokerTopicStats.removeMetrics(topicPartition.topic)
+ // this will delete the local log. This call may throw exception if the log is on offline directory
+ removedPartition.delete()
+ } else if (logManager.getLog(topicPartition).isDefined) {
+ // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
+ // This could happen when topic is being deleted while broker is down and recovers.
+ logManager.asyncDelete(topicPartition)
}
}
case None =>
@@ -317,8 +350,14 @@ class ReplicaManager(val config: KafkaConfig,
// First stop fetchers for all partitions, then stop the corresponding replicas
replicaFetcherManager.removeFetcherForPartitions(partitions)
for (topicPartition <- partitions){
- val error = stopReplica(topicPartition, stopReplicaRequest.deletePartitions)
- responseMap.put(topicPartition, error)
+ try {
+ val error = stopReplica(topicPartition, stopReplicaRequest.deletePartitions)
+ responseMap.put(topicPartition, error)
+ } catch {
+ case e: KafkaStorageException =>
+ stateChangeLogger.error(s"Broker $localBrokerId ignoring stop replica (delete=${stopReplicaRequest.deletePartitions}) for partition $topicPartition due to storage exception", e)
+ responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
+ }
}
(responseMap, Errors.NONE)
}
@@ -332,8 +371,15 @@ class ReplicaManager(val config: KafkaConfig,
Option(allPartitions.get(topicPartition))
def getReplicaOrException(topicPartition: TopicPartition): Replica = {
- getReplica(topicPartition).getOrElse {
- throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not available for partition $topicPartition")
+ getPartition(topicPartition) match {
+ case Some(partition) =>
+ if (partition eq ReplicaManager.OfflinePartition)
+ throw new KafkaStorageException(s"Replica $localBrokerId is in an offline log directory for partition $topicPartition")
+ else
+ partition.getReplica(localBrokerId).getOrElse(
+ throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not available for partition $topicPartition"))
+ case None =>
+ throw new ReplicaNotAvailableException(s"Replica $localBrokerId is not available for partition $topicPartition")
}
}
@@ -343,7 +389,9 @@ class ReplicaManager(val config: KafkaConfig,
case None =>
throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist on $localBrokerId")
case Some(partition) =>
- partition.leaderReplicaIfLocal match {
+ if (partition eq ReplicaManager.OfflinePartition)
+ throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")
+ else partition.leaderReplicaIfLocal match {
case Some(leaderReplica) => leaderReplica
case None =>
throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
@@ -352,10 +400,17 @@ class ReplicaManager(val config: KafkaConfig,
}
def getReplica(topicPartition: TopicPartition, replicaId: Int): Option[Replica] =
- getPartition(topicPartition).flatMap(_.getReplica(replicaId))
+ getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition).flatMap(_.getReplica(replicaId))
def getReplica(tp: TopicPartition): Option[Replica] = getReplica(tp, localBrokerId)
+ def getLogDir(topicPartition: TopicPartition): Option[String] = {
+ getReplica(topicPartition).flatMap(_.log) match {
+ case Some(log) => Some(log.dir.getParent)
+ case None => None
+ }
+ }
+
/**
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
* the callback function will be triggered either when timeout or the required acks are satisfied;
@@ -422,8 +477,14 @@ class ReplicaManager(val config: KafkaConfig,
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
} else {
try {
- val partition = getPartition(topicPartition).getOrElse(
- throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId)))
+ val partition = getPartition(topicPartition) match {
+ case Some(p) =>
+ if (p eq ReplicaManager.OfflinePartition)
+ throw new KafkaStorageException("Partition %s is in an offline log directory on broker %d".format(topicPartition, localBrokerId))
+ p
+ case None =>
+ throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId))
+ }
val convertedOffset =
if (requestedOffset == DeleteRecordsRequest.HIGH_WATERMARK) {
partition.leaderReplicaIfLocal match {
@@ -443,14 +504,11 @@ class ReplicaManager(val config: KafkaConfig,
} catch {
// NOTE: Failed produce requests metric is not incremented for known exceptions
// it is supposed to indicate un-expected failures of a broker in handling a produce request
- case e: KafkaStorageException =>
- fatal("Halting due to unrecoverable I/O error while handling DeleteRecordsRequest: ", e)
- Runtime.getRuntime.halt(1)
- (topicPartition, null)
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderForPartitionException |
_: OffsetOutOfRangeException |
_: PolicyViolationException |
+ _: KafkaStorageException |
_: NotEnoughReplicasException) =>
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(e)))
case t: Throwable =>
@@ -543,6 +601,8 @@ class ReplicaManager(val config: KafkaConfig,
val partitionOpt = getPartition(topicPartition)
val info = partitionOpt match {
case Some(partition) =>
+ if (partition eq ReplicaManager.OfflinePartition)
+ throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")
partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
@@ -567,15 +627,12 @@ class ReplicaManager(val config: KafkaConfig,
} catch {
// NOTE: Failed produce requests metric is not incremented for known exceptions
// it is supposed to indicate un-expected failures of a broker in handling a produce request
- case e: KafkaStorageException =>
- fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
- Exit.halt(1)
- (topicPartition, null)
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderForPartitionException |
_: RecordTooLargeException |
_: RecordBatchTooLargeException |
_: CorruptRecordException |
+ _: KafkaStorageException |
_: InvalidTimestampException) =>
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
case t: Throwable =>
@@ -747,6 +804,7 @@ class ReplicaManager(val config: KafkaConfig,
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderForPartitionException |
_: ReplicaNotAvailableException |
+ _: KafkaStorageException |
_: OffsetOutOfRangeException) =>
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = -1L,
@@ -793,7 +851,7 @@ class ReplicaManager(val config: KafkaConfig,
* the quota is exceeded and the replica is not in sync.
*/
def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicPartition, replicaId: Int): Boolean = {
- val isReplicaInSync = getPartition(topicPartition).flatMap { partition =>
+ val isReplicaInSync = getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition).flatMap { partition =>
partition.getReplica(replicaId).map(partition.inSyncReplicas.contains)
}.getOrElse(false)
quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
@@ -819,7 +877,8 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
+ def becomeLeaderOrFollower(correlationId: Int,
+ leaderAndISRRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
@@ -842,9 +901,14 @@ class ReplicaManager(val config: KafkaConfig,
leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
val partition = getOrCreatePartition(topicPartition)
val partitionLeaderEpoch = partition.getLeaderEpoch
- // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
- // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
- if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
+ if (partition eq ReplicaManager.OfflinePartition) {
+ stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
+ "epoch %d for partition [%s,%d] as the local replica for the partition is in an offline log directory")
+ .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
+ responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
+ } else if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
+ // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
+ // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
if(stateInfo.replicas.contains(localBrokerId))
partitionState.put(partition, stateInfo)
else {
@@ -878,6 +942,17 @@ class ReplicaManager(val config: KafkaConfig,
else
Set.empty[Partition]
+ leaderAndISRRequest.partitionStates.asScala.keys.foreach( topicPartition =>
+ /*
+ * If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
+ * before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
+ * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
+ * we need to map this topic-partition to OfflinePartition instead.
+ */
+ if (getReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition))
+ allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
+ )
+
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
if (!hwThreadInitialized) {
@@ -885,7 +960,6 @@ class ReplicaManager(val config: KafkaConfig,
hwThreadInitialized = true
}
replicaFetcherManager.shutdownIdleFetcherThreads()
-
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
BecomeLeaderOrFollowerResult(responseMap, Errors.NONE)
}
@@ -926,18 +1000,27 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
// Update the partition information to be the leader
partitionState.foreach{ case (partition, partitionStateInfo) =>
- if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
- partitionsToMakeLeaders += partition
- else
- stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +
- "controller %d epoch %d for partition %s since it is already the leader for the partition.")
- .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
- }
- partitionsToMakeLeaders.foreach { partition =>
- stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
- "%d epoch %d with correlation id %d for partition %s")
- .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
+ try {
+ if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) {
+ partitionsToMakeLeaders += partition
+ stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
+ "%d epoch %d with correlation id %d for partition %s")
+ .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
+ } else
+ stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +
+ "controller %d epoch %d for partition %s since it is already the leader for the partition.")
+ .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
+ } catch {
+ case e: KafkaStorageException =>
+ stateChangeLogger.error(("Broker %d skipped the become-leader state change with correlation id %d from " +
+ "controller %d epoch %d for partition %s since the replica for the partition is offline due to disk error %s.")
+ .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition, e))
+ val dirOpt = getLogDir(new TopicPartition(partition.topic, partition.partitionId))
+ error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e)
+ responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.KAFKA_STORAGE_ERROR)
+ }
}
+
} catch {
case e: Throwable =>
partitionState.keys.foreach { partition =>
@@ -996,27 +1079,37 @@ class ReplicaManager(val config: KafkaConfig,
// TODO: Delete leaders from LeaderAndIsrRequest
partitionState.foreach{ case (partition, partitionStateInfo) =>
- val newLeaderBrokerId = partitionStateInfo.leader
- metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
- // Only change partition state when the leader is available
- case Some(_) =>
- if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
- partitionsToMakeFollower += partition
- else
- stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
- "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader")
+ try {
+ val newLeaderBrokerId = partitionStateInfo.leader
+ metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
+ // Only change partition state when the leader is available
+ case Some(_) =>
+ if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
+ partitionsToMakeFollower += partition
+ else
+ stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
+ "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader")
+ .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
+ partition.topicPartition, newLeaderBrokerId))
+ case None =>
+ // The leader broker should always be present in the metadata cache.
+ // If not, we should record the error message and abort the transition process for this partition
+ stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
+ " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.")
.format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
- partition.topicPartition, newLeaderBrokerId))
- case None =>
- // The leader broker should always be present in the metadata cache.
- // If not, we should record the error message and abort the transition process for this partition
- stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
- " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.")
- .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
- partition.topicPartition, newLeaderBrokerId))
- // Create the local replica even if the leader is unavailable. This is required to ensure that we include
- // the partition's high watermark in the checkpoint file (see KAFKA-1647)
- partition.getOrCreateReplica()
+ partition.topicPartition, newLeaderBrokerId))
+ // Create the local replica even if the leader is unavailable. This is required to ensure that we include
+ // the partition's high watermark in the checkpoint file (see KAFKA-1647)
+ partition.getOrCreateReplica(isNew = partitionStateInfo.isNew)
+ }
+ } catch {
+ case e: KafkaStorageException =>
+ stateChangeLogger.error(("Broker %d skipped the become-follower state change with correlation id %d from " +
+ "controller %d epoch %d for partition [%s,%d] since the replica for the partition is offline due to disk error %s")
+ .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, partition.topic, partition.partitionId, e))
+ val dirOpt = getLogDir(new TopicPartition(partition.topic, partition.partitionId))
+ error(s"Error while making broker the follower for partition $partition in dir $dirOpt", e)
+ responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.KAFKA_STORAGE_ERROR)
}
}
@@ -1080,7 +1173,7 @@ class ReplicaManager(val config: KafkaConfig,
private def maybeShrinkIsr(): Unit = {
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
- allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
+ allPartitions.values.filter(_ ne ReplicaManager.OfflinePartition).foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
}
private def updateFollowerLogReadResults(replicaId: Int, readResults: Seq[(TopicPartition, LogReadResult)]) {
@@ -1088,7 +1181,8 @@ class ReplicaManager(val config: KafkaConfig,
readResults.foreach { case (topicPartition, readResult) =>
getPartition(topicPartition) match {
case Some(partition) =>
- partition.updateReplicaLogReadResult(replicaId, readResult)
+ if (partition ne ReplicaManager.OfflinePartition)
+ partition.updateReplicaLogReadResult(replicaId, readResult)
// for producer requests with ack > 1, we need to check
// if they can be unblocked after some follower's log end offsets have moved
@@ -1100,33 +1194,86 @@ class ReplicaManager(val config: KafkaConfig,
}
private def getLeaderPartitions: List[Partition] =
- allPartitions.values.filter(_.leaderReplicaIfLocal.isDefined).toList
+ allPartitions.values.filter(partition => (partition ne ReplicaManager.OfflinePartition) && partition.leaderReplicaIfLocal.isDefined).toList
def getLogEndOffset(topicPartition: TopicPartition): Option[Long] = {
- getPartition(topicPartition).flatMap{ partition =>
- partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset)
+ getPartition(topicPartition) match {
+ case Some(partition) =>
+ if (partition eq ReplicaManager.OfflinePartition)
+ None
+ else
+ partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset)
+ case None => None
}
}
// Flushes the highwatermark value for all partitions to the highwatermark file
def checkpointHighWatermarks() {
- val replicas = allPartitions.values.flatMap(_.getReplica(localBrokerId))
- val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
+ val replicas = allPartitions.values.filter(_ ne ReplicaManager.OfflinePartition).flatMap(_.getReplica(localBrokerId))
+ val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
for ((dir, reps) <- replicasByDir) {
val hwms = reps.map(r => r.partition.topicPartition -> r.highWatermark.messageOffset).toMap
try {
- highWatermarkCheckpoints(dir).write(hwms)
+ highWatermarkCheckpoints.get(dir).foreach(_.write(hwms))
} catch {
- case e: IOException =>
- fatal("Error writing to highwatermark file: ", e)
- Exit.halt(1)
+ case e: KafkaStorageException =>
+ error(s"Error while writing to highwatermark file in directory $dir", e)
}
}
}
+ def handleLogDirFailure(dir: String) {
+ if (!logManager.isLogDirOnline(dir))
+ return
+
+ info(s"Stopping serving replicas in dir $dir")
+ replicaStateChangeLock synchronized {
+ val newOfflinePartitions = allPartitions.values.filter { partition =>
+ if (partition eq ReplicaManager.OfflinePartition)
+ false
+ else partition.getReplica(config.brokerId) match {
+ case Some(replica) =>
+ replica.log.isDefined && replica.log.get.dir.getParent == dir
+ case None => false
+ }
+ }.map(_.topicPartition)
+
+ info(s"Partitions ${newOfflinePartitions.mkString(",")} are offline due to failure on log directory $dir")
+
+ newOfflinePartitions.foreach { topicPartition =>
+ val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
+ partition.removePartitionMetrics()
+ }
+
+ newOfflinePartitions.map(_.topic).toSet.foreach { topic: String =>
+ val topicHasPartitions = allPartitions.values.exists(partition => topic == partition.topic)
+ if (!topicHasPartitions)
+ brokerTopicStats.removeMetrics(topic)
+ }
+
+ replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions.toSet)
+ highWatermarkCheckpoints = highWatermarkCheckpoints.filterKeys(_ != dir)
+ info("Broker %d stopped fetcher for partitions %s because they are in the failed log dir %s"
+ .format(localBrokerId, newOfflinePartitions.mkString(", "), dir))
+ }
+ logManager.handleLogDirFailure(dir)
+ LogDirUtils.propagateLogDirEvent(zkUtils, localBrokerId)
+ info(s"Stopped serving replicas in dir $dir")
+ }
+
+ def removeMetrics() {
+ removeMetric("LeaderCount")
+ removeMetric("PartitionCount")
+ removeMetric("OfflineReplicaCount")
+ removeMetric("UnderReplicatedPartitions")
+ }
+
// High watermark do not need to be checkpointed only when under unit tests
def shutdown(checkpointHW: Boolean = true) {
info("Shutting down")
+ removeMetrics()
+ if (logDirFailureHandler != null)
+ logDirFailureHandler.shutdown()
replicaFetcherManager.shutdown()
delayedFetchPurgatory.shutdown()
delayedProducePurgatory.shutdown()
@@ -1144,7 +1291,10 @@ class ReplicaManager(val config: KafkaConfig,
requestedEpochInfo.map { case (tp, leaderEpoch) =>
val epochEndOffset = getPartition(tp) match {
case Some(partition) =>
- partition.lastOffsetForLeaderEpoch(leaderEpoch)
+ if (partition eq ReplicaManager.OfflinePartition)
+ new EpochEndOffset(KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH_OFFSET)
+ else
+ partition.lastOffsetForLeaderEpoch(leaderEpoch)
case None =>
new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
index cc50620..7b67559 100644
--- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
@@ -18,9 +18,13 @@ package kafka.server.checkpoints
import java.io._
import java.nio.charset.StandardCharsets
-import java.nio.file.{FileAlreadyExistsException, FileSystems, Files, Paths}
-import kafka.utils.{Exit, Logging}
+import java.nio.file.{FileAlreadyExistsException, Files, Paths}
+
+import kafka.server.LogDirFailureChannel
+import kafka.utils.Logging
+import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.utils.Utils
+
import scala.collection.{Seq, mutable}
trait CheckpointFileFormatter[T]{
@@ -29,86 +33,94 @@ trait CheckpointFileFormatter[T]{
def fromLine(line: String): Option[T]
}
-class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileFormatter[T]) extends Logging {
+class CheckpointFile[T](val file: File,
+ version: Int,
+ formatter: CheckpointFileFormatter[T],
+ logDirFailureChannel: LogDirFailureChannel,
+ logDir: String) extends Logging {
private val path = file.toPath.toAbsolutePath
private val tempPath = Paths.get(path.toString + ".tmp")
private val lock = new Object()
-
+
try Files.createFile(file.toPath) // create the file if it doesn't exist
catch { case _: FileAlreadyExistsException => }
def write(entries: Seq[T]) {
lock synchronized {
- // write to temp file and then swap with the existing file
- val fileOutputStream = new FileOutputStream(tempPath.toFile)
- val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
try {
- writer.write(version.toString)
- writer.newLine()
-
- writer.write(entries.size.toString)
- writer.newLine()
+ // write to temp file and then swap with the existing file
+ val fileOutputStream = new FileOutputStream(tempPath.toFile)
+ val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
+ try {
+ writer.write(version.toString)
+ writer.newLine()
- entries.foreach { entry =>
- writer.write(formatter.toLine(entry))
+ writer.write(entries.size.toString)
writer.newLine()
+
+ entries.foreach { entry =>
+ writer.write(formatter.toLine(entry))
+ writer.newLine()
+ }
+
+ writer.flush()
+ fileOutputStream.getFD().sync()
+ } finally {
+ writer.close()
}
- writer.flush()
- fileOutputStream.getFD().sync()
+ Utils.atomicMoveWithFallback(tempPath, path)
} catch {
- case e: FileNotFoundException =>
- if (FileSystems.getDefault.isReadOnly) {
- fatal(s"Halting writes to checkpoint file (${file.getAbsolutePath}) because the underlying file system is inaccessible: ", e)
- Exit.halt(1)
- }
- throw e
- } finally {
- writer.close()
+ case e: IOException =>
+ logDirFailureChannel.maybeAddLogFailureEvent(logDir)
+ throw new KafkaStorageException(s"Error while writing to checkpoint file ${file.getAbsolutePath}", e)
}
-
- Utils.atomicMoveWithFallback(tempPath, path)
}
}
def read(): Seq[T] = {
def malformedLineException(line: String) =
new IOException(s"Malformed line in checkpoint file (${file.getAbsolutePath}): $line'")
-
lock synchronized {
- val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
- var line: String = null
try {
- line = reader.readLine()
- if (line == null)
- return Seq.empty
- line.toInt match {
- case fileVersion if fileVersion == version =>
- line = reader.readLine()
- if (line == null)
- return Seq.empty
- val expectedSize = line.toInt
- val entries = mutable.Buffer[T]()
- line = reader.readLine()
- while (line != null) {
- val entry = formatter.fromLine(line)
- entry match {
- case Some(e) =>
- entries += e
- line = reader.readLine()
- case _ => throw malformedLineException(line)
+ val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
+ var line: String = null
+ try {
+ line = reader.readLine()
+ if (line == null)
+ return Seq.empty
+ line.toInt match {
+ case fileVersion if fileVersion == version =>
+ line = reader.readLine()
+ if (line == null)
+ return Seq.empty
+ val expectedSize = line.toInt
+ val entries = mutable.Buffer[T]()
+ line = reader.readLine()
+ while (line != null) {
+ val entry = formatter.fromLine(line)
+ entry match {
+ case Some(e) =>
+ entries += e
+ line = reader.readLine()
+ case _ => throw malformedLineException(line)
+ }
}
- }
- if (entries.size != expectedSize)
- throw new IOException(s"Expected $expectedSize entries in checkpoint file (${file.getAbsolutePath}), but found only ${entries.size}")
- entries
- case _ =>
- throw new IOException(s"Unrecognized version of the checkpoint file (${file.getAbsolutePath}): " + version)
+ if (entries.size != expectedSize)
+ throw new IOException(s"Expected $expectedSize entries in checkpoint file (${file.getAbsolutePath}), but found only ${entries.size}")
+ entries
+ case _ =>
+ throw new IOException(s"Unrecognized version of the checkpoint file (${file.getAbsolutePath}): " + version)
+ }
+ } catch {
+ case _: NumberFormatException => throw malformedLineException(line)
+ } finally {
+ reader.close()
}
} catch {
- case _: NumberFormatException => throw malformedLineException(line)
- } finally {
- reader.close()
+ case e: IOException =>
+ logDirFailureChannel.maybeAddLogFailureEvent(logDir)
+ throw new KafkaStorageException(s"Error while reading checkpoint file ${file.getAbsolutePath}", e)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
index d32d30f..a8db688 100644
--- a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
@@ -19,6 +19,7 @@ package kafka.server.checkpoints
import java.io._
import java.util.regex.Pattern
+import kafka.server.LogDirFailureChannel
import kafka.server.epoch.EpochEntry
import scala.collection._
@@ -55,10 +56,10 @@ object LeaderEpochCheckpointFile {
/**
* This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
*/
-class LeaderEpochCheckpointFile(val file: File) extends LeaderEpochCheckpoint {
+class LeaderEpochCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) extends LeaderEpochCheckpoint {
import LeaderEpochCheckpointFile._
- val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter)
+ val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter, logDirFailureChannel, file.getParentFile.getParent)
def write(epochs: Seq[EpochEntry]): Unit = checkpoint.write(epochs)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 5f5dc97..9cd0963 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -19,6 +19,7 @@ package kafka.server.checkpoints
import java.io._
import java.util.regex.Pattern
+import kafka.server.LogDirFailureChannel
import kafka.server.epoch.EpochEntry
import org.apache.kafka.common.TopicPartition
@@ -51,9 +52,9 @@ trait OffsetCheckpoint {
/**
* This class persists a map of (Partition => Offsets) to a file (for a certain replica)
*/
-class OffsetCheckpointFile(val f: File) {
+class OffsetCheckpointFile(val f: File, logDirFailureChannel: LogDirFailureChannel = null) {
val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointFile.CurrentVersion,
- OffsetCheckpointFile.Formatter)
+ OffsetCheckpointFile.Formatter, logDirFailureChannel, f.getParent)
def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets.toSeq)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/utils/LogDirUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/LogDirUtils.scala b/core/src/main/scala/kafka/utils/LogDirUtils.scala
new file mode 100644
index 0000000..0bbc47d
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/LogDirUtils.scala
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.controller.LogDirEventNotificationListener
+import scala.collection.Map
+
+object LogDirUtils extends Logging {
+
+ private val LogDirEventNotificationPrefix = "log_dir_event_"
+ val LogDirFailureEvent = 1
+
+ def propagateLogDirEvent(zkUtils: ZkUtils, brokerId: Int) {
+ val logDirEventNotificationPath: String = zkUtils.createSequentialPersistentPath(
+ ZkUtils.LogDirEventNotificationPath + "/" + LogDirEventNotificationPrefix, logDirFailureEventZkData(brokerId))
+ debug("Added " + logDirEventNotificationPath + " for broker " + brokerId)
+ }
+
+ private def logDirFailureEventZkData(brokerId: Int): String = {
+ Json.encode(Map("version" -> LogDirEventNotificationListener.version, "broker" -> brokerId, "event" -> LogDirFailureEvent))
+ }
+
+ def deleteLogDirEvents(zkUtils: ZkUtils) {
+ val sequenceNumbers = zkUtils.getChildrenParentMayNotExist(ZkUtils.LogDirEventNotificationPath).toSet
+ sequenceNumbers.map(x => zkUtils.deletePath(ZkUtils.LogDirEventNotificationPath + "/" + x))
+ }
+
+ def getBrokerIdFromLogDirEvent(zkUtils: ZkUtils, child: String): Option[Int] = {
+ val changeZnode = ZkUtils.LogDirEventNotificationPath + "/" + child
+ val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode)
+ if (jsonOpt.isDefined) {
+ val json = Json.parseFull(jsonOpt.get)
+
+ json match {
+ case Some(m) =>
+ val brokerAndEventType = m.asInstanceOf[Map[String, Any]]
+ val brokerId = brokerAndEventType.get("broker").get.asInstanceOf[Int]
+ val eventType = brokerAndEventType.get("event").get.asInstanceOf[Int]
+ if (eventType != LogDirFailureEvent)
+ throw new IllegalArgumentException(s"The event type $eventType in znode $changeZnode is not recognized")
+ Some(brokerId)
+ case None =>
+ error("Invalid LogDirEvent JSON: " + jsonOpt.get + " in ZK: " + changeZnode)
+ None
+ }
+ } else {
+ None
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 0035120..7d3529f 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -51,6 +51,7 @@ object ZkUtils {
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val IsrChangeNotificationPath = "/isr_change_notification"
+ val LogDirEventNotificationPath = "/log_dir_event_notification"
val KafkaAclPath = "/kafka-acl"
val KafkaAclChangesPath = "/kafka-acl-changes"
@@ -75,7 +76,8 @@ object ZkUtils {
IsrChangeNotificationPath,
KafkaAclPath,
KafkaAclChangesPath,
- ProducerIdBlockPath)
+ ProducerIdBlockPath,
+ LogDirEventNotificationPath)
// Important: it is necessary to add any new top level Zookeeper path that contains
// sensitive information that should not be world readable to the Seq
@@ -235,7 +237,8 @@ class ZkUtils(val zkClient: ZkClient,
DeleteTopicsPath,
BrokerSequenceIdPath,
IsrChangeNotificationPath,
- ProducerIdBlockPath)
+ ProducerIdBlockPath,
+ LogDirEventNotificationPath)
// Visible for testing
val zkPath = new ZkPath(zkClient)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 09ff9be..2b134fe 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -44,7 +44,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{Node, TopicPartition, requests}
import org.junit.Assert._
import org.junit.{After, Assert, Before, Test}
-
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.Buffer
@@ -272,7 +271,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createUpdateMetadataRequest = {
- val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava
+ val partitionState = Map(tp -> new UpdateMetadataRequest.PartitionState(
+ Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, Seq.empty[Integer].asJava)).asJava
val securityProtocol = SecurityProtocol.PLAINTEXT
val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol,
@@ -303,8 +303,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, "").build()
private def leaderAndIsrRequest = {
- new requests.LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue,
- Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava,
+ new requests.LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue,
+ Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, false)).asJava,
Set(new Node(brokerId, "localhost", 0)).asJava).build()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 921c2b4..5e3c7ab 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -37,6 +37,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
val producerCount: Int
val consumerCount: Int
val serverCount: Int
+ var logDirCount: Int = 1
lazy val producerConfig = new Properties
lazy val consumerConfig = new Properties
lazy val serverConfig = new Properties
@@ -46,7 +47,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
override def generateConfigs = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
- trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
+ trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
cfgs.foreach { config =>
config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
@@ -84,7 +85,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
saslProperties = this.clientSaslProperties,
props = Some(producerConfig))
}
-
+
def createNewConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
TestUtils.createNewConsumer(brokerList,
securityProtocol = this.securityProtocol,
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
new file mode 100644
index 0000000..6942df0
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util.Collections
+import java.util.concurrent.{ExecutionException, TimeUnit}
+
+import kafka.controller.{OfflineReplica, PartitionAndReplica}
+import kafka.server.KafkaConfig
+import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException}
+import org.junit.{Before, Test}
+import org.junit.Assert.assertTrue
+import org.junit.Assert.assertEquals
+
+/**
+ * Test whether clients can producer and consume when there is log directory failure
+ */
+class LogDirFailureTest extends IntegrationTestHarness {
+ val producerCount: Int = 1
+ val consumerCount: Int = 1
+ val serverCount: Int = 2
+ private val topic = "topic"
+
+ this.logDirCount = 2
+ this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
+ this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
+ this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp, "100")
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ TestUtils.createTopic(zkUtils, topic, 1, 2, servers = servers)
+ }
+
+ @Test
+ def testProduceAfterLogDirFailure() {
+
+ val consumer = consumers.head
+ subscribeAndWaitForAssignment(topic, consumer)
+ val producer = producers.head
+ val partition = new TopicPartition(topic, 0)
+ val record = new ProducerRecord(topic, 0, s"key".getBytes, s"value".getBytes)
+
+ val leaderServerId = producer.partitionsFor(topic).get(0).leader().id()
+ val leaderServer = servers.find(_.config.brokerId == leaderServerId).get
+
+ // The first send() should succeed
+ producer.send(record).get()
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(0).count() == 1
+ }, "Expected the first message", 3000L)
+
+ // Make log directory of the partition on the leader broker inaccessible by replacing it with a file
+ val replica = leaderServer.replicaManager.getReplica(partition)
+ val logDir = replica.get.log.get.dir.getParentFile
+ CoreUtils.swallow(Utils.delete(logDir))
+ logDir.createNewFile()
+ assertTrue(logDir.isFile)
+
+ // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the topic will be offline
+ TestUtils.waitUntilTrue(() => !leaderServer.logManager.liveLogDirs.contains(logDir), "Expected log directory offline", 3000L)
+ assertTrue(leaderServer.replicaManager.getReplica(partition).isEmpty)
+
+ // The second send() should fail due to either KafkaStorageException or NotLeaderForPartitionException
+ try {
+ producer.send(record).get(6000, TimeUnit.MILLISECONDS)
+ fail("send() should fail with either KafkaStorageException or NotLeaderForPartitionException")
+ } catch {
+ case e: ExecutionException =>
+ e.getCause match {
+ case t: KafkaStorageException =>
+ case t: NotLeaderForPartitionException => // This may happen if ProduceRequest version <= 3
+ case t: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${t.toString}")
+ }
+ case e: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${e.toString}")
+ }
+
+ // Wait for producer to update metadata for the partition
+ TestUtils.waitUntilTrue(() => {
+ // ProduceResponse may contain KafkaStorageException and trigger metadata update
+ producer.send(record)
+ producer.partitionsFor(topic).get(0).leader().id() != leaderServerId
+ }, "Expected new leader for the partition", 6000L)
+
+ // Consumer should receive some messages
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(0).count() > 0
+ }, "Expected some messages", 3000L)
+
+ // There should be no remaining LogDirEventNotification znode
+ assertTrue(zkUtils.getChildrenParentMayNotExist(ZkUtils.LogDirEventNotificationPath).isEmpty)
+
+ // The controller should have marked the replica on the original leader as offline
+ val controllerServer = servers.find(_.kafkaController.isActive).get
+ val offlineReplicas = controllerServer.kafkaController.replicaStateMachine.replicasInState(topic, OfflineReplica)
+ assertTrue(offlineReplicas.contains(PartitionAndReplica(topic, 0, leaderServerId)))
+ }
+
+ private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
+ consumer.subscribe(Collections.singletonList(topic))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(0)
+ !consumer.assignment.isEmpty
+ }, "Expected non-empty assignment")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 0e57e53..760cc39 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -389,7 +389,7 @@ class TransactionsTest extends KafkaServerTestHarness {
val recordMetadata = result.get()
error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
servers.foreach { server =>
- error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
+ error(s"log dirs: ${server.logManager.liveLogDirs.map(_.getAbsolutePath).head}")
}
fail("Should not be able to send messages from a fenced producer.")
} catch {
@@ -436,7 +436,7 @@ class TransactionsTest extends KafkaServerTestHarness {
val recordMetadata = result.get()
error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
servers.foreach { case (server) =>
- error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
+ error(s"log dirs: ${server.logManager.liveLogDirs.map(_.getAbsolutePath).head}")
}
fail("Should not be able to send messages from a fenced producer.")
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 147e84a..ebe7223 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -111,7 +111,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown,
- quotaManagers.follower, new BrokerTopicStats, metadataCache) {
+ quotaManagers.follower, new BrokerTopicStats, metadataCache, logDirFailureChannel) {
override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
quotaManager: ReplicationQuotaManager) =
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index a8ce17e..24e2920 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -97,7 +97,7 @@ class GroupMetadataManagerTest {
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
EasyMock.replay(replicaManager)
-
+
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
@@ -685,9 +685,9 @@ class GroupMetadataManagerTest {
assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR)
- assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN)
- assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN)
- assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN)
+ assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN_SERVER_ERROR)
+ assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN_SERVER_ERROR)
+ assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN_SERVER_ERROR)
assertStoreGroupErrorMapping(Errors.CORRUPT_MESSAGE, Errors.CORRUPT_MESSAGE)
}
@@ -1311,7 +1311,7 @@ class GroupMetadataManagerTest {
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
capturedArgument
}
-
+
private def expectAppendMessage(error: Errors) {
val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index df2f7df..6323d15 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -144,7 +144,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
@Test
def shouldThrowIllegalStateExceptionWhenUnknownError(): Unit = {
- verifyThrowIllegalStateExceptionOnError(Errors.UNKNOWN)
+ verifyThrowIllegalStateExceptionOnError(Errors.UNKNOWN_SERVER_ERROR)
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 6a35d41..e86e088 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -309,7 +309,7 @@ class TransactionStateManagerTest {
transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]())
transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1)
- expectedError = Errors.UNKNOWN
+ expectedError = Errors.UNKNOWN_SERVER_ERROR
var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE)
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index bf36199..d6f0a56 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -20,7 +20,7 @@ import java.io.File
import java.nio.file.Files
import java.util.Properties
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils.{MockTime, Pool, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
@@ -110,6 +110,7 @@ abstract class AbstractLogCleanerIntegrationTest {
new LogCleaner(cleanerConfig,
logDirs = Array(logDir),
logs = logMap,
+ logDirFailureChannel = new LogDirFailureChannel(1),
time = time)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 8a119c2..e569b29 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index b4c1790..f4eabc0 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -218,7 +218,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
private def createCleanerManager(log: Log): LogCleanerManager = {
val logs = new Pool[TopicPartition, Log]()
logs.put(new TopicPartition("log", 0), log)
- val cleanerManager = new LogCleanerManager(Array(logDir), logs)
+ val cleanerManager = new LogCleanerManager(Array(logDir), logs, null)
cleanerManager
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 3e58c4d..689a032 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -881,7 +881,7 @@ class LogCleanerTest extends JUnitSuite {
checkSegmentOrder(groups)
}
- /**
+ /**
* Following the loading of a log segment where the index file is zero sized,
* the index returned would be the base offset. Sometimes the log file would
* contain data with offsets in excess of the baseOffset which would cause
@@ -1324,7 +1324,7 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
lastOffset = offset
map.put(keyFor(key), offset)
}
-
+
override def get(key: ByteBuffer): Long = {
val k = keyFor(key)
if(map.containsKey(k))
@@ -1332,9 +1332,9 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
else
-1L
}
-
+
override def clear(): Unit = map.clear()
-
+
override def size: Int = map.size
override def latestOffset: Long = lastOffset
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 8b7819f..0826747 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -21,12 +21,10 @@ import java.io._
import java.util.Properties
import kafka.common._
-import kafka.server.FetchDataInfo
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.OffsetOutOfRangeException
-import org.apache.kafka.common.requests.IsolationLevel
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -52,7 +50,7 @@ class LogManagerTest {
logDir = TestUtils.tempDir()
logManager = createLogManager()
logManager.startup()
- logDir = logManager.logDirs(0)
+ logDir = logManager.liveLogDirs(0)
}
@After
@@ -60,7 +58,7 @@ class LogManagerTest {
if(logManager != null)
logManager.shutdown()
Utils.delete(logDir)
- logManager.logDirs.foreach(Utils.delete)
+ logManager.liveLogDirs.foreach(Utils.delete)
}
/**
@@ -68,7 +66,7 @@ class LogManagerTest {
*/
@Test
def testCreateLog() {
- val log = logManager.createLog(new TopicPartition(name, 0), logConfig)
+ val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig)
val logFile = new File(logDir, name + "-0")
assertTrue(logFile.exists)
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
@@ -90,7 +88,7 @@ class LogManagerTest {
*/
@Test
def testCleanupExpiredSegments() {
- val log = logManager.createLog(new TopicPartition(name, 0), logConfig)
+ val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig)
var offset = 0L
for(_ <- 0 until 200) {
val set = TestUtils.singletonRecords("test".getBytes())
@@ -135,7 +133,7 @@ class LogManagerTest {
logManager.startup()
// create a log
- val log = logManager.createLog(new TopicPartition(name, 0), config)
+ val log = logManager.getOrCreateLog(new TopicPartition(name, 0), config)
var offset = 0L
// add a bunch of messages that should be larger than the retentionSize
@@ -175,7 +173,7 @@ class LogManagerTest {
def testDoesntCleanLogsWithCompactDeletePolicy() {
val logProps = new Properties()
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete)
- val log = logManager.createLog(new TopicPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
+ val log = logManager.getOrCreateLog(new TopicPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
var offset = 0L
for (_ <- 0 until 200) {
val set = TestUtils.singletonRecords("test".getBytes(), key="test".getBytes())
@@ -204,7 +202,7 @@ class LogManagerTest {
logManager = createLogManager()
logManager.startup()
- val log = logManager.createLog(new TopicPartition(name, 0), config)
+ val log = logManager.getOrCreateLog(new TopicPartition(name, 0), config)
val lastFlush = log.lastFlushTime
for (_ <- 0 until 200) {
val set = TestUtils.singletonRecords("test".getBytes())
@@ -228,7 +226,7 @@ class LogManagerTest {
// verify that logs are always assigned to the least loaded partition
for(partition <- 0 until 20) {
- logManager.createLog(new TopicPartition("test", partition), logConfig)
+ logManager.getOrCreateLog(new TopicPartition("test", partition), logConfig)
assertEquals("We should have created the right number of logs", partition + 1, logManager.allLogs.size)
val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size)
assertTrue("Load should balance evenly", counts.max <= counts.min + 1)
@@ -286,7 +284,7 @@ class LogManagerTest {
private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition],
logManager: LogManager) {
- val logs = topicPartitions.map(this.logManager.createLog(_, logConfig))
+ val logs = topicPartitions.map(this.logManager.getOrCreateLog(_, logConfig))
logs.foreach(log => {
for (_ <- 0 until 50)
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
@@ -294,7 +292,7 @@ class LogManagerTest {
log.flush()
})
- logManager.checkpointRecoveryPointOffsets()
+ logManager.checkpointLogRecoveryOffsets()
val checkpoints = new OffsetCheckpointFile(new File(logDir, logManager.RecoveryPointCheckpointFile)).read()
topicPartitions.zip(logs).foreach {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 79fe220..30ccc8b 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -30,7 +30,7 @@ import scala.collection.JavaConverters._
import scala.collection._
class LogSegmentTest {
-
+
val topicPartition = new TopicPartition("topic", 0)
val segments = mutable.ArrayBuffer[LogSegment]()
var logDir: File = _
@@ -52,7 +52,7 @@ class LogSegmentTest {
segments += seg
seg
}
-
+
/* create a ByteBufferMessageSet for the given messages starting from the given offset */
def records(offset: Long, records: String*): MemoryRecords = {
MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, offset, CompressionType.NONE, TimestampType.CREATE_TIME,
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 008cd27..2213d09 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -254,7 +254,8 @@ class LogTest {
maxProducerIdExpirationMs = 300000,
producerIdExpirationCheckIntervalMs = 30000,
topicPartition = Log.parseTopicPartitionName(logDir),
- stateManager)
+ producerStateManager = stateManager,
+ logDirFailureChannel = null)
EasyMock.verify(stateManager)
@@ -322,7 +323,8 @@ class LogTest {
maxProducerIdExpirationMs = 300000,
producerIdExpirationCheckIntervalMs = 30000,
topicPartition = Log.parseTopicPartitionName(logDir),
- stateManager)
+ producerStateManager = stateManager,
+ logDirFailureChannel = null)
EasyMock.verify(stateManager)
}
@@ -356,7 +358,8 @@ class LogTest {
maxProducerIdExpirationMs = 300000,
producerIdExpirationCheckIntervalMs = 30000,
topicPartition = Log.parseTopicPartitionName(logDir),
- stateManager)
+ producerStateManager = stateManager,
+ logDirFailureChannel = null)
EasyMock.verify(stateManager)
cleanShutdownFile.delete()
@@ -391,7 +394,8 @@ class LogTest {
maxProducerIdExpirationMs = 300000,
producerIdExpirationCheckIntervalMs = 30000,
topicPartition = Log.parseTopicPartitionName(logDir),
- stateManager)
+ producerStateManager = stateManager,
+ logDirFailureChannel = null)
EasyMock.verify(stateManager)
cleanShutdownFile.delete()
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index b6b40c2..5e63500 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.Utils
import org.easymock.EasyMock
import org.junit._
import org.junit.Assert._
-import kafka.common._
import kafka.cluster.Replica
import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
import java.util.concurrent.atomic.AtomicBoolean
@@ -35,24 +34,28 @@ class HighwatermarkPersistenceTest {
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
val topic = "foo"
+ val zkUtils = EasyMock.createMock(classOf[ZkUtils])
val logManagers = configs map { config =>
TestUtils.createLogManager(
logDirs = config.logDirs.map(new File(_)).toArray,
cleanerConfig = CleanerConfig())
}
-
+
+ val logDirFailureChannels = configs map { config =>
+ new LogDirFailureChannel(config.logDirs.size)
+ }
+
@After
def teardown() {
- for(manager <- logManagers; dir <- manager.logDirs)
+ for(manager <- logManagers; dir <- manager.liveLogDirs)
Utils.delete(dir)
}
@Test
def testHighWatermarkPersistenceSinglePartition() {
// mock zkclient
- val zkUtils = EasyMock.createMock(classOf[ZkUtils])
EasyMock.replay(zkUtils)
-
+
// create kafka scheduler
val scheduler = new KafkaScheduler(2)
scheduler.startup
@@ -61,7 +64,7 @@ class HighwatermarkPersistenceTest {
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
- new BrokerTopicStats, new MetadataCache(configs.head.brokerId))
+ new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
replicaManager.startup()
try {
replicaManager.checkpointHighWatermarks()
@@ -69,7 +72,7 @@ class HighwatermarkPersistenceTest {
assertEquals(0L, fooPartition0Hw)
val partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
// create leader and follower replicas
- val log0 = logManagers.head.createLog(new TopicPartition(topic, 0), LogConfig())
+ val log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic, 0), LogConfig())
val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, time, 0, Some(log0))
partition0.addReplicaIfNotExists(leaderReplicaPartition0)
val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, time)
@@ -96,7 +99,6 @@ class HighwatermarkPersistenceTest {
val topic1 = "foo1"
val topic2 = "foo2"
// mock zkclient
- val zkUtils = EasyMock.createMock(classOf[ZkUtils])
EasyMock.replay(zkUtils)
// create kafka scheduler
val scheduler = new KafkaScheduler(2)
@@ -106,7 +108,7 @@ class HighwatermarkPersistenceTest {
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
- new BrokerTopicStats, new MetadataCache(configs.head.brokerId))
+ new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
replicaManager.startup()
try {
replicaManager.checkpointHighWatermarks()
@@ -114,7 +116,7 @@ class HighwatermarkPersistenceTest {
assertEquals(0L, topic1Partition0Hw)
val topic1Partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic1, 0))
// create leader log
- val topic1Log0 = logManagers.head.createLog(new TopicPartition(topic1, 0), LogConfig())
+ val topic1Log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic1, 0), LogConfig())
// create a local replica for topic1
val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, time, 0, Some(topic1Log0))
topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
@@ -130,7 +132,7 @@ class HighwatermarkPersistenceTest {
// add another partition and set highwatermark
val topic2Partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic2, 0))
// create leader log
- val topic2Log0 = logManagers.head.createLog(new TopicPartition(topic2, 0), LogConfig())
+ val topic2Log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic2, 0), LogConfig())
// create a local replica for topic2
val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, time, 0, Some(topic2Log0))
topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
@@ -163,5 +165,5 @@ class HighwatermarkPersistenceTest {
replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read.getOrElse(
new TopicPartition(topic, partition), 0L)
}
-
+
}