You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/01/08 07:32:22 UTC
git commit: kafka-1074;
Reassign partitions should delete the old replicas from disk;
patched by Jun Rao; reviewed by Jay Kreps, Neha Narkhede and Guozhang Wang
Updated Branches:
refs/heads/trunk 15f3c8417 -> f63e3f730
kafka-1074; Reassign partitions should delete the old replicas from disk; patched by Jun Rao; reviewed by Jay Kreps, Neha Narkhede and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f63e3f73
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f63e3f73
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f63e3f73
Branch: refs/heads/trunk
Commit: f63e3f730673f60ca179030cf0d64380689c2a40
Parents: 15f3c84
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jan 7 22:36:00 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 7 22:36:00 2014 -0800
----------------------------------------------------------------------
.../main/scala/kafka/cluster/Partition.scala | 19 +-
.../common/LogCleaningAbortedException.scala | 24 +++
.../common/OptimisticLockFailureException.scala | 23 --
.../kafka/common/ThreadShutdownException.scala | 24 +++
.../kafka/controller/KafkaController.scala | 2 +-
core/src/main/scala/kafka/log/Log.scala | 37 ++--
core/src/main/scala/kafka/log/LogCleaner.scala | 208 +++++++------------
.../scala/kafka/log/LogCleanerManager.scala | 188 +++++++++++++++++
core/src/main/scala/kafka/log/LogManager.scala | 76 ++++---
.../kafka/server/ReplicaFetcherThread.scala | 7 +-
.../scala/kafka/server/ReplicaManager.scala | 18 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 15 +-
.../test/scala/unit/kafka/log/CleanerTest.scala | 39 ++--
13 files changed, 452 insertions(+), 228 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 5c9307d..1087a2e 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -25,10 +25,11 @@ import kafka.log.LogConfig
import kafka.server.ReplicaManager
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
-import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
+import kafka.controller.KafkaController
import org.apache.log4j.Logger
import kafka.message.ByteBufferMessageSet
import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping}
+import java.io.IOException
/**
@@ -135,6 +136,22 @@ class Partition(val topic: String,
assignedReplicaMap.remove(replicaId)
}
+ def delete() {
+ // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted
+ leaderIsrUpdateLock synchronized {
+ assignedReplicaMap.clear()
+ inSyncReplicas = Set.empty[Replica]
+ leaderReplicaIdOpt = None
+ try {
+ logManager.deleteLog(TopicAndPartition(topic, partitionId))
+ } catch {
+ case e: IOException =>
+ fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e)
+ Runtime.getRuntime().halt(1)
+ }
+ }
+ }
+
def getLeaderEpoch(): Int = {
leaderIsrUpdateLock synchronized {
return this.leaderEpoch
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala b/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala
new file mode 100644
index 0000000..5ea6632
--- /dev/null
+++ b/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when a log cleaning task is requested to be aborted.
+ */
+class LogCleaningAbortedException() extends RuntimeException() {
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala b/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
deleted file mode 100644
index 0e69110..0000000
--- a/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-/**
- * Thrown when an optimistic locking attempt receives concurrent modifications
- */
-class OptimisticLockFailureException(message: String) extends RuntimeException(message)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/common/ThreadShutdownException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ThreadShutdownException.scala b/core/src/main/scala/kafka/common/ThreadShutdownException.scala
new file mode 100644
index 0000000..6554a5e
--- /dev/null
+++ b/core/src/main/scala/kafka/common/ThreadShutdownException.scala
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * An exception that indicates a thread is being shut down normally.
+ */
+class ThreadShutdownException() extends RuntimeException {
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 6215cb8..03ef9cf 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -413,7 +413,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
* 5. Stop old replicas AR - RAR. As part of this, we make 2 state changes OfflineReplica and NonExistentReplica. As part
* of OfflineReplica state change, we shrink the ISR to remove RAR - AR in zookeeper and sent a LeaderAndIsr ONLY to
* the Leader to notify it of the shrunk ISR. After that, we send a StopReplica (delete = false) to the replicas in
- * RAR - AR. Currently, NonExistentReplica state change is a NO-OP
+ * RAR - AR. As part of the NonExistentReplica state change, we delete replicas in RAR - AR.
* 6. Write new AR = RAR. As part of this, we finally change the AR in zookeeper to RAR.
* 7. Remove partition from the /admin/reassign_partitions path
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index beda421..b3ab522 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -63,12 +63,11 @@ class Log(val dir: File,
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
loadSegments()
- /* The number of times the log has been truncated */
- private val truncates = new AtomicInteger(0)
-
/* Calculate the offset of the next message */
private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())
+ val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name)
+
info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
newGauge(name + "-" + "NumLogSegments",
@@ -202,11 +201,6 @@ class Log(val dir: File,
def numberOfSegments: Int = segments.size
/**
- * The number of truncates that have occurred since the log was opened.
- */
- def numberOfTruncates: Int = truncates.get
-
- /**
* Close this log
*/
def close() {
@@ -524,16 +518,19 @@ class Log(val dir: File,
/**
* Completely delete this log directory and all contents from the file system with no delay
*/
- def delete(): Unit = {
- logSegments.foreach(_.delete())
- Utils.rm(dir)
+ private[log] def delete() {
+ lock synchronized {
+ logSegments.foreach(_.delete())
+ segments.clear()
+ Utils.rm(dir)
+ }
}
/**
* Truncate this log so that it ends with the greatest offset < targetOffset.
* @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
*/
- def truncateTo(targetOffset: Long) {
+ private[log] def truncateTo(targetOffset: Long) {
info("Truncating log %s to offset %d.".format(name, targetOffset))
if(targetOffset < 0)
throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
@@ -551,7 +548,6 @@ class Log(val dir: File,
this.nextOffset.set(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
}
- truncates.getAndIncrement
}
}
@@ -559,7 +555,7 @@ class Log(val dir: File,
* Delete all data in the log and start at the new offset
* @param newOffset The new offset to start the log with
*/
- def truncateFullyAndStartAt(newOffset: Long) {
+ private[log] def truncateFullyAndStartAt(newOffset: Long) {
debug("Truncate and start log '" + name + "' to " + newOffset)
lock synchronized {
val segmentsToDelete = logSegments.toList
@@ -571,7 +567,6 @@ class Log(val dir: File,
time = time))
this.nextOffset.set(newOffset)
this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
- truncates.getAndIncrement
}
}
@@ -650,10 +645,8 @@ class Log(val dir: File,
* @param newSegment The new log segment to add to the log
* @param oldSegments The old log segments to delete from the log
*/
- private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], expectedTruncates: Int) {
+ private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment]) {
lock synchronized {
- if(expectedTruncates != numberOfTruncates)
- throw new OptimisticLockFailureException("The log has been truncated, expected %d but found %d.".format(expectedTruncates, numberOfTruncates))
// need to do this in two phases to be crash safe AND do the delete asynchronously
// if we crash in the middle of this we complete the swap in loadSegments()
newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)
@@ -735,5 +728,13 @@ object Log {
def indexFilename(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
+
+ /**
+ * Parse the topic and partition out of the directory name of a log
+ */
+ def parseTopicPartitionName(name: String): TopicAndPartition = {
+ val index = name.lastIndexOf('-')
+ TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index ccde2ab..6404647 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -20,15 +20,12 @@ package kafka.log
import scala.collection._
import scala.math
import java.nio._
-import java.util.concurrent.Semaphore
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic._
import java.util.Date
import java.io.File
import kafka.common._
import kafka.message._
-import kafka.server.OffsetCheckpoint
import kafka.utils._
+import java.lang.IllegalStateException
/**
* The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
@@ -67,19 +64,9 @@ class LogCleaner(val config: CleanerConfig,
val logDirs: Array[File],
val logs: Pool[TopicAndPartition, Log],
time: Time = SystemTime) extends Logging {
-
- /* the offset checkpoints holding the last cleaned point for each log */
- private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
-
- /* the set of logs currently being cleaned */
- private val inProgress = mutable.HashSet[TopicAndPartition]()
-
- /* a global lock used to control all access to the in-progress set and the offset checkpoints */
- private val lock = new Object
-
- /* a counter for creating unique thread names*/
- private val threadId = new AtomicInteger(0)
-
+ /* for managing the state of partitions being cleaned. */
+ private val cleanerManager = new LogCleanerManager(logDirs, logs);
+
/* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
checkIntervalMs = 300,
@@ -87,10 +74,7 @@ class LogCleaner(val config: CleanerConfig,
time = time)
/* the threads */
- private val cleaners = (0 until config.numThreads).map(_ => new CleanerThread())
-
- /* a hook for testing to synchronize on log cleaning completions */
- private val cleaned = new Semaphore(0)
+ private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
/**
* Start the background cleaning
@@ -105,102 +89,79 @@ class LogCleaner(val config: CleanerConfig,
*/
def shutdown() {
info("Shutting down the log cleaner.")
- cleaners.foreach(_.interrupt())
- cleaners.foreach(_.join())
+ cleaners.foreach(_.shutdown())
}
/**
- * For testing, a way to know when work has completed. This method blocks until the
- * cleaner has processed up to the given offset on the specified topic/partition
+ * Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
+ * the partition is aborted.
*/
- def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = {
- while(!allCleanerCheckpoints.contains(TopicAndPartition(topic, part)))
- cleaned.tryAcquire(timeout, TimeUnit.MILLISECONDS)
+ def abortCleaning(topicAndPartition: TopicAndPartition) {
+ cleanerManager.abortCleaning(topicAndPartition)
}
-
+
/**
- * @return the position processed for all logs.
+ * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
+ * This call blocks until the cleaning of the partition is aborted and paused.
*/
- def allCleanerCheckpoints(): Map[TopicAndPartition, Long] =
- checkpoints.values.flatMap(_.read()).toMap
-
- /**
- * Choose the log to clean next and add it to the in-progress set. We recompute this
- * every time off the full set of logs to allow logs to be dynamically added to the pool of logs
- * the log manager maintains.
- */
- private def grabFilthiestLog(): Option[LogToClean] = {
- lock synchronized {
- val lastClean = allCleanerCheckpoints()
- val cleanableLogs = logs.filter(l => l._2.config.dedupe) // skip any logs marked for delete rather than dedupe
- .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress
- .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each
- val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0) // must have some bytes
- .filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
- if(dirtyLogs.isEmpty) {
- None
- } else {
- val filthiest = dirtyLogs.max
- inProgress += filthiest.topicPartition
- Some(filthiest)
- }
- }
+ def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
+ cleanerManager.abortAndPauseCleaning(topicAndPartition)
}
-
+
/**
- * Save out the endOffset and remove the given log from the in-progress set.
+ * Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.
*/
- private def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
- lock synchronized {
- val checkpoint = checkpoints(dataDir)
- val offsets = checkpoint.read() + ((topicAndPartition, endOffset))
- checkpoint.write(offsets)
- inProgress -= topicAndPartition
- }
- cleaned.release()
+ def resumeCleaning(topicAndPartition: TopicAndPartition) {
+ cleanerManager.resumeCleaning(topicAndPartition)
}
/**
+ * TODO:
+ * For testing, a way to know when work has completed. This method blocks until the
+ * cleaner has processed up to the given offset on the specified topic/partition
+ */
+ def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = {
+ while(!cleanerManager.allCleanerCheckpoints.contains(TopicAndPartition(topic, part)))
+ Thread.sleep(10)
+ }
+
+ /**
* The cleaner threads do the actual log cleaning. Each thread processes does its cleaning repeatedly by
* choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
*/
- private class CleanerThread extends Thread {
+ private class CleanerThread(threadId: Int)
+ extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) {
if(config.dedupeBufferSize / config.numThreads > Int.MaxValue)
warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...")
- val cleaner = new Cleaner(id = threadId.getAndIncrement(),
+
+ val cleaner = new Cleaner(id = threadId,
offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt,
hashAlgorithm = config.hashAlgorithm),
ioBufferSize = config.ioBufferSize / config.numThreads / 2,
maxIoBufferSize = config.maxMessageSize,
dupBufferLoadFactor = config.dedupeBufferLoadFactor,
throttler = throttler,
- time = time)
+ time = time,
+ checkDone = checkDone)
- setName("kafka-log-cleaner-thread-" + cleaner.id)
- setDaemon(false)
+ private def checkDone(topicAndPartition: TopicAndPartition) {
+ if (!isRunning.get())
+ throw new ThreadShutdownException
+ cleanerManager.checkCleaningAborted(topicAndPartition)
+ }
/**
* The main loop for the cleaner thread
*/
- override def run() {
- info("Starting cleaner thread %d...".format(cleaner.id))
- try {
- while(!isInterrupted) {
- cleanOrSleep()
- }
- } catch {
- case e: InterruptedException => // all done
- case e: Exception =>
- error("Error in cleaner thread %d:".format(cleaner.id), e)
- }
- info("Shutting down cleaner thread %d.".format(cleaner.id))
+ override def doWork() {
+ cleanOrSleep()
}
/**
* Clean a log if there is a dirty log available, otherwise sleep for a bit
*/
private def cleanOrSleep() {
- grabFilthiestLog() match {
+ cleanerManager.grabFilthiestLog() match {
case None =>
// there are no cleanable logs, sleep a while
time.sleep(config.backOffMs)
@@ -211,10 +172,9 @@ class LogCleaner(val config: CleanerConfig,
endOffset = cleaner.clean(cleanable)
logStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleaner.stats)
} catch {
- case e: OptimisticLockFailureException =>
- info("Cleaning of log was aborted due to colliding truncate operation.")
+ case pe: LogCleaningAbortedException => // task can be aborted, let it go.
} finally {
- doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
+ cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
}
}
}
@@ -260,7 +220,8 @@ private[log] class Cleaner(val id: Int,
maxIoBufferSize: Int,
dupBufferLoadFactor: Double,
throttler: Throttler,
- time: Time) extends Logging {
+ time: Time,
+ checkDone: (TopicAndPartition) => Unit) extends Logging {
this.logIdent = "Cleaner " + id + ": "
@@ -284,8 +245,7 @@ private[log] class Cleaner(val id: Int,
stats.clear()
info("Beginning cleaning of log %s.".format(cleanable.log.name))
val log = cleanable.log
- val truncateCount = log.numberOfTruncates
-
+
// build the offset map
info("Building offset map for %s...".format(cleanable.log.name))
val upperBoundOffset = log.activeSegment.baseOffset
@@ -303,7 +263,7 @@ private[log] class Cleaner(val id: Int,
// group the segments and clean the groups
info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs)))
for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
- cleanSegments(log, group, offsetMap, truncateCount, deleteHorizonMs)
+ cleanSegments(log, group, offsetMap, deleteHorizonMs)
stats.allDone()
endOffset
@@ -318,10 +278,9 @@ private[log] class Cleaner(val id: Int,
* @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet
* @param deleteHorizonMs The time to retain delete tombstones
*/
- private[log] def cleanSegments(log: Log,
+ private[log] def cleanSegments(log: Log,
segments: Seq[LogSegment],
map: OffsetMap,
- expectedTruncateCount: Int,
deleteHorizonMs: Long) {
// create a new segment with the suffix .cleaned appended to both the log and index name
val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
@@ -332,32 +291,32 @@ private[log] class Cleaner(val id: Int,
val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, time)
- // clean segments into the new destination segment
- for (old <- segments) {
- val retainDeletes = old.lastModified > deleteHorizonMs
- info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
- .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
- cleanInto(old, cleaned, map, retainDeletes)
- }
-
- // trim excess index
- index.trimToValidSize()
-
- // flush new segment to disk before swap
- cleaned.flush()
-
- // update the modification date to retain the last modified date of the original files
- val modified = segments.last.lastModified
- cleaned.lastModified = modified
-
- // swap in new segment
- info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
try {
- log.replaceSegments(cleaned, segments, expectedTruncateCount)
+ // clean segments into the new destination segment
+ for (old <- segments) {
+ val retainDeletes = old.lastModified > deleteHorizonMs
+ info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
+ .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
+ cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes)
+ }
+
+ // trim excess index
+ index.trimToValidSize()
+
+ // flush new segment to disk before swap
+ cleaned.flush()
+
+ // update the modification date to retain the last modified date of the original files
+ val modified = segments.last.lastModified
+ cleaned.lastModified = modified
+
+ // swap in new segment
+ info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
+ log.replaceSegments(cleaned, segments)
} catch {
- case e: OptimisticLockFailureException =>
+ case e: LogCleaningAbortedException =>
cleaned.delete()
- throw e
+ throw e
}
}
@@ -372,10 +331,11 @@ private[log] class Cleaner(val id: Int,
*
* TODO: Implement proper compression support
*/
- private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
+ private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment,
+ dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
var position = 0
while (position < source.log.sizeInBytes) {
- checkDone()
+ checkDone(topicAndPartition)
// read a chunk of messages and copy any that are to be retained to the write buffer to be written out
readBuffer.clear()
writeBuffer.clear()
@@ -491,9 +451,9 @@ private[log] class Cleaner(val id: Int,
require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name))
val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong
for (segment <- dirty) {
- checkDone()
+ checkDone(log.topicAndPartition)
if(segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor)
- offset = buildOffsetMap(segment, map)
+ offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
}
info("Offset map for log %s complete.".format(log.name))
offset
@@ -507,11 +467,11 @@ private[log] class Cleaner(val id: Int,
*
* @return The final offset covered by the map
*/
- private def buildOffsetMap(segment: LogSegment, map: OffsetMap): Long = {
+ private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = {
var position = 0
var offset = segment.baseOffset
while (position < segment.log.sizeInBytes) {
- checkDone()
+ checkDone(topicAndPartition)
readBuffer.clear()
val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position))
throttler.maybeThrottle(messages.sizeInBytes)
@@ -532,14 +492,6 @@ private[log] class Cleaner(val id: Int,
restoreBuffers()
offset
}
-
- /**
- * If we aren't running any more throw an AllDoneException
- */
- private def checkDone() {
- if (Thread.currentThread.isInterrupted)
- throw new InterruptedException
- }
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
new file mode 100644
index 0000000..1612c8d
--- /dev/null
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import kafka.utils.{Logging, Pool}
+import kafka.server.OffsetCheckpoint
+import collection.mutable
+import java.util.concurrent.locks.ReentrantLock
+import kafka.utils.Utils._
+import java.util.concurrent.TimeUnit
+import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
+
+private[log] sealed trait LogCleaningState
+private[log] case object LogCleaningInProgress extends LogCleaningState
+private[log] case object LogCleaningAborted extends LogCleaningState
+private[log] case object LogCleaningPaused extends LogCleaningState
+
+/**
+ * Manage the state of each partition being cleaned.
+ * If a partition is to be cleaned, it enters the LogCleaningInProgress state.
+ * While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters
+ * the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state.
+ * While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is
+ * requested to be resumed.
+ */
+private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging {
+ /* the offset checkpoints holding the last cleaned point for each log */
+ private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
+
+ /* the set of logs currently being cleaned */
+ private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
+
+ /* a global lock used to control all access to the in-progress set and the offset checkpoints */
+ private val lock = new ReentrantLock
+ /* for coordinating the pausing and the cleaning of a partition */
+ private val pausedCleaningCond = lock.newCondition()
+
+ /**
+ * @return the position processed for all logs.
+ */
+ def allCleanerCheckpoints(): Map[TopicAndPartition, Long] =
+ checkpoints.values.flatMap(_.read()).toMap
+
+ /**
+ * Choose the log to clean next and add it to the in-progress set. We recompute this
+ * every time off the full set of logs to allow logs to be dynamically added to the pool of logs
+ * the log manager maintains.
+ */
+ def grabFilthiestLog(): Option[LogToClean] = {
+ inLock(lock) {
+ val lastClean = allCleanerCheckpoints()
+ val cleanableLogs = logs.filter(l => l._2.config.dedupe) // skip any logs marked for delete rather than dedupe
+ .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress
+ .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each
+ val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0) // must have some bytes
+ .filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
+ if(dirtyLogs.isEmpty) {
+ None
+ } else {
+ val filthiest = dirtyLogs.max
+ inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
+ Some(filthiest)
+ }
+ }
+ }
+
+ /**
+ * Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
+ * the partition is aborted.
+ * This is implemented by first abortAndPausing and then resuming the cleaning of the partition.
+ */
+ def abortCleaning(topicAndPartition: TopicAndPartition) {
+ inLock(lock) {
+ abortAndPauseCleaning(topicAndPartition)
+ resumeCleaning(topicAndPartition)
+ info("The cleaning for partition %s is aborted".format(topicAndPartition))
+ }
+ }
+
+ /**
+ * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
+ * This call blocks until the cleaning of the partition is aborted and paused.
+ * 1. If the partition is not in progress, mark it as paused.
+ * 2. Otherwise, first mark the state of the partition as aborted.
+ * 3. The cleaner thread checks the state periodically and if it sees the state of the partition is aborted, it
+ * throws a LogCleaningAbortedException to stop the cleaning task.
+ * 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused.
+ * 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused.
+ */
+ def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
+ inLock(lock) {
+ inProgress.get(topicAndPartition) match {
+ case None =>
+ inProgress.put(topicAndPartition, LogCleaningPaused)
+ case Some(state) =>
+ state match {
+ case LogCleaningInProgress =>
+ inProgress.put(topicAndPartition, LogCleaningAborted)
+ case s =>
+ throw new IllegalStateException(("Partiiton %s can't be aborted and pasued since it's in %s state").format(topicAndPartition, s))
+ }
+ }
+ while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
+ pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
+ info("The cleaning for partition %s is aborted and paused".format(topicAndPartition))
+ }
+ }
+
+ /**
+ * Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.
+ */
+ def resumeCleaning(topicAndPartition: TopicAndPartition) {
+ inLock(lock) {
+ inProgress.get(topicAndPartition) match {
+ case None =>
+ throw new IllegalStateException(("Partiiton %s can't be resumed since it's never paused").format(topicAndPartition))
+ case Some(state) =>
+ state match {
+ case LogCleaningPaused =>
+ inProgress.remove(topicAndPartition)
+ case s =>
+ throw new IllegalStateException(("Partiiton %s can't be resumed since it's in %s state").format(topicAndPartition, s))
+ }
+ }
+ }
+ info("The cleaning for partition %s is resumed".format(topicAndPartition))
+ }
+
+ /**
+ * Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call.
+ */
+ def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
+ inProgress.get(topicAndPartition) match {
+ case None => return false
+ case Some(state) =>
+ if (state == expectedState)
+ return true
+ else
+ return false
+ }
+ }
+
+ /**
+ * Check if the cleaning for a partition is aborted. If so, throw an exception.
+ */
+ def checkCleaningAborted(topicAndPartition: TopicAndPartition) {
+ inLock(lock) {
+ if (isCleaningInState(topicAndPartition, LogCleaningAborted))
+ throw new LogCleaningAbortedException()
+ }
+ }
+
+ /**
+ * Save out the endOffset and remove the given log from the in-progress set, if not aborted.
+ */
+ def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
+ inLock(lock) {
+ inProgress(topicAndPartition) match {
+ case LogCleaningInProgress =>
+ val checkpoint = checkpoints(dataDir)
+ val offsets = checkpoint.read() + ((topicAndPartition, endOffset))
+ checkpoint.write(offsets)
+ inProgress.remove(topicAndPartition)
+ case LogCleaningAborted =>
+ inProgress.put(topicAndPartition, LogCleaningPaused)
+ pausedCleaningCond.signalAll()
+ case s =>
+ throw new IllegalStateException(("In-progress partiiton %s can't be in %s state").format(topicAndPartition, s))
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 81be88a..10062af 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -22,10 +22,8 @@ import java.util.concurrent.TimeUnit
import kafka.utils._
import scala.collection._
import kafka.common.{TopicAndPartition, KafkaException}
-import kafka.server.KafkaConfig
import kafka.server.OffsetCheckpoint
-
/**
* The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
* All read and write operations are delegated to the individual log instances.
@@ -50,9 +48,9 @@ class LogManager(val logDirs: Array[File],
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
- private val logCreationLock = new Object
+ private val logCreationOrDeletionLock = new Object
private val logs = new Pool[TopicAndPartition, Log]()
-
+
createAndValidateLogDirs(logDirs)
private var dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
@@ -115,7 +113,7 @@ class LogManager(val logDirs: Array[File],
for(dir <- subDirs) {
if(dir.isDirectory) {
info("Loading log '" + dir.getName + "'")
- val topicPartition = parseTopicPartitionName(dir.getName)
+ val topicPartition = Log.parseTopicPartitionName(dir.getName)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val log = new Log(dir,
config,
@@ -194,12 +192,36 @@ class LogManager(val logDirs: Array[File],
val log = logs.get(topicAndPartition)
// If the log does not exist, skip it
if (log != null) {
+ //May need to abort and pause the cleaning of the log, and resume after truncation is done.
+ val needToStopCleaner: Boolean = (truncateOffset < log.activeSegment.baseOffset)
+ if (needToStopCleaner && cleaner != null)
+ cleaner.abortAndPauseCleaning(topicAndPartition)
log.truncateTo(truncateOffset)
+ if (needToStopCleaner && cleaner != null)
+ cleaner.resumeCleaning(topicAndPartition)
}
}
checkpointRecoveryPointOffsets()
}
-
+
+ /**
+ * Delete all data in a partition and start the log at the new offset
+ * @param newOffset The new offset to start the log with
+ */
+ def truncateFullyAndStartAt(topicAndPartition: TopicAndPartition, newOffset: Long) {
+ val log = logs.get(topicAndPartition)
+ // If the log does not exist, skip it
+ if (log != null) {
+ //Abort and pause the cleaning of the log, and resume after truncation is done.
+ if (cleaner != null)
+ cleaner.abortAndPauseCleaning(topicAndPartition)
+ log.truncateFullyAndStartAt(newOffset)
+ if (cleaner != null)
+ cleaner.resumeCleaning(topicAndPartition)
+ }
+ checkpointRecoveryPointOffsets()
+ }
+
/**
* Write out the current recovery point for all logs to a text file in the log directory
* to avoid recovering the whole log on startup.
@@ -229,7 +251,7 @@ class LogManager(val logDirs: Array[File],
* If the log already exists, just return a copy of the existing log
*/
def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {
- logCreationLock synchronized {
+ logCreationOrDeletionLock synchronized {
var log = logs.get(topicAndPartition)
// check if the log has already been created in another thread
@@ -254,7 +276,27 @@ class LogManager(val logDirs: Array[File],
log
}
}
-
+
+ /**
+ * Delete a log.
+ */
+ def deleteLog(topicAndPartition: TopicAndPartition) {
+ var removedLog: Log = null
+ logCreationOrDeletionLock synchronized {
+ removedLog = logs.remove(topicAndPartition)
+ }
+ if (removedLog != null) {
+ //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
+ if (cleaner != null)
+ cleaner.abortCleaning(topicAndPartition)
+ removedLog.delete()
+ info("Deleted log for partition [%s,%d] in %s."
+ .format(topicAndPartition.topic,
+ topicAndPartition.partition,
+ removedLog.dir.getAbsolutePath))
+ }
+ }
+
/**
* Choose the next directory in which to create a log. Currently this is done
* by calculating the number of partitions in each directory and then choosing the
@@ -280,7 +322,6 @@ class LogManager(val logDirs: Array[File],
*/
private def cleanupExpiredSegments(log: Log): Int = {
val startMs = time.milliseconds
- val topic = parseTopicPartitionName(log.name).topic
log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
}
@@ -289,7 +330,6 @@ class LogManager(val logDirs: Array[File],
* is at least logRetentionSize bytes in size
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
- val topic = parseTopicPartitionName(log.dir.getName).topic
if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
return 0
var diff = log.size - log.config.retentionSize
@@ -334,6 +374,7 @@ class LogManager(val logDirs: Array[File],
*/
private def flushDirtyLogs() = {
debug("Checking for dirty logs to flush...")
+
for ((topicAndPartition, log) <- logs) {
try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
@@ -344,22 +385,7 @@ class LogManager(val logDirs: Array[File],
} catch {
case e: Throwable =>
error("Error flushing topic " + topicAndPartition.topic, e)
- e match {
- case _: IOException =>
- fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)
- System.exit(1)
- case _ =>
- }
}
}
}
-
- /**
- * Parse the topic and partition out of the directory name of a log
- */
- private def parseTopicPartitionName(name: String): TopicAndPartition = {
- val index = name.lastIndexOf('-')
- TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 715845b..73e605e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -68,7 +68,6 @@ class ReplicaFetcherThread(name:String,
*/
def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
- val log = replica.log.get
/**
* Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
@@ -81,8 +80,8 @@ class ReplicaFetcherThread(name:String,
* There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
*/
val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
- if (leaderEndOffset < log.logEndOffset) {
- log.truncateTo(leaderEndOffset)
+ if (leaderEndOffset < replica.logEndOffset) {
+ replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset))
warn("Replica %d for partition %s reset its fetch offset to current leader %d's latest offset %d"
.format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderEndOffset))
leaderEndOffset
@@ -94,7 +93,7 @@ class ReplicaFetcherThread(name:String,
* Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
*/
val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
- log.truncateFullyAndStartAt(leaderStartOffset)
+ replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
warn("Replica %d for partition %s reset its fetch offset to current leader %d's start offset %d"
.format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderStartOffset))
leaderStartOffset
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 242c18d..f9d10d3 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,7 +29,7 @@ import com.yammer.metrics.core.Gauge
import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
-import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
+import kafka.controller.KafkaController
import org.apache.log4j.Logger
@@ -116,16 +116,16 @@ class ReplicaManager(val config: KafkaConfig,
def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
stateChangeLogger.trace("Broker %d handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId))
val errorCode = ErrorMapping.NoError
- getReplica(topic, partitionId) match {
- case Some(replica) =>
- /* TODO: handle deleteLog in a better way */
- //if (deletePartition)
- // logManager.deleteLog(topic, partition)
+ getPartition(topic, partitionId) match {
+ case Some(partition) =>
leaderPartitionsLock synchronized {
- leaderPartitions -= replica.partition
+ leaderPartitions -= partition
+ }
+ if(deletePartition) {
+ val removedPartition = allPartitions.remove((topic, partitionId))
+ if (removedPartition != null)
+ removedPartition.delete() // this will delete the local log
}
- if(deletePartition)
- allPartitions.remove((topic, partitionId))
case None => //do nothing if replica no longer exists
}
stateChangeLogger.trace("Broker %d finished handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId))
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 52d35a3..59de1b4 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -23,9 +23,10 @@ import java.util.Properties
import kafka.utils._
import kafka.log._
import kafka.zk.ZooKeeperTestHarness
-import kafka.server.{KafkaServer, KafkaConfig}
import kafka.utils.{Logging, ZkUtils, TestUtils}
-import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition}
+import kafka.common.{TopicExistsException, TopicAndPartition}
+import kafka.server.{KafkaServer, KafkaConfig}
+import java.io.File
class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
@@ -132,6 +133,12 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
}
}
+ private def getBrokersWithPartitionDir(servers: Iterable[KafkaServer], topic: String, partitionId: Int): Set[Int] = {
+ servers.filter(server => new File(server.config.logDirs.head, topic + "-" + partitionId).exists)
+ .map(_.config.brokerId)
+ .toSet
+ }
+
@Test
def testPartitionReassignmentWithLeaderInNewReplicas() {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
@@ -157,6 +164,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+ assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
servers.foreach(_.shutdown())
}
@@ -184,6 +192,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+ assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
servers.foreach(_.shutdown())
}
@@ -211,6 +220,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+ assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
servers.foreach(_.shutdown())
}
@@ -251,6 +261,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
// ensure that there are no under replicated partitions
ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+ assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
servers.foreach(_.shutdown())
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 5a312bf..51cd94b 100644
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -19,7 +19,7 @@ package kafka.log
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
-import org.junit.{After, Before, Test}
+import org.junit.{After, Test}
import java.nio._
import java.io.File
import scala.collection._
@@ -62,7 +62,7 @@ class CleanerTest extends JUnitSuite {
keys.foreach(k => map.put(key(k), Long.MaxValue))
// clean the log
- cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0, 0L)
+ cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L)
val shouldRemain = keysInLog(log).filter(!keys.contains(_))
assertEquals(shouldRemain, keysInLog(log))
}
@@ -94,29 +94,31 @@ class CleanerTest extends JUnitSuite {
/* extract all the keys from a log */
def keysInLog(log: Log): Iterable[Int] =
log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).map(m => Utils.readString(m.message.key).toInt))
-
-
+
+ def abortCheckDone(topicAndPartition: TopicAndPartition) {
+ throw new LogCleaningAbortedException()
+ }
+
/**
- * Test that a truncation during cleaning throws an OptimisticLockFailureException
+ * Test that abortion during cleaning throws a LogCleaningAbortedException
*/
@Test
- def testCleanSegmentsWithTruncation() {
- val cleaner = makeCleaner(Int.MaxValue)
+ def testCleanSegmentsWithAbort() {
+ val cleaner = makeCleaner(Int.MaxValue, abortCheckDone)
val log = makeLog(config = logConfig.copy(segmentSize = 1024))
-
+
// append messages to the log until we have four segments
- while(log.numberOfSegments < 2)
+ while(log.numberOfSegments < 4)
log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-
- log.truncateTo(log.logEndOffset-2)
+
val keys = keysInLog(log)
val map = new FakeOffsetMap(Int.MaxValue)
keys.foreach(k => map.put(key(k), Long.MaxValue))
- intercept[OptimisticLockFailureException] {
- cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0, 0L)
+ intercept[LogCleaningAbortedException] {
+ cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L)
}
}
-
+
/**
* Validate the logic for grouping log segments together for cleaning
*/
@@ -196,15 +198,18 @@ class CleanerTest extends JUnitSuite {
def makeLog(dir: File = dir, config: LogConfig = logConfig) =
new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
-
- def makeCleaner(capacity: Int) =
+
+ def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */ }
+
+ def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = noOpCheckDone) =
new Cleaner(id = 0,
offsetMap = new FakeOffsetMap(capacity),
ioBufferSize = 64*1024,
maxIoBufferSize = 64*1024,
dupBufferLoadFactor = 0.75,
throttler = throttler,
- time = time)
+ time = time,
+ checkDone = checkDone )
def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
for((key, value) <- seq)