You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/07/26 01:42:08 UTC
svn commit: r1365841 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/cluster/ main/scala/kafka/consumer/ main/scala/kafka/log/
main/scala/kafka/message/ main/scala/kafka/server/ main/scala/kafka/utils/
test/scala/unit/kafka/log/ test/scal...
Author: nehanarkhede
Date: Wed Jul 25 23:42:07 2012
New Revision: 1365841
URL: http://svn.apache.org/viewvc?rev=1365841&view=rev
Log:
KAFKA-405 Improve high watermark maintenance to store high watermarks for all partitions in a single .highwatermark file; patched by Neha Narkhede; reviewed by Jay Kreps and Jun Rao
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Wed Jul 25 23:42:07 2012
@@ -56,7 +56,9 @@ class Partition(val topic: String,
assignedReplicas
}
- def getReplica(replicaId: Int): Option[Replica] = assignedReplicas().find(_.brokerId == replicaId)
+ def getReplica(replicaId: Int): Option[Replica] = {
+ assignedReplicas().find(_.brokerId == replicaId)
+ }
def addReplica(replica: Replica): Boolean = {
if(!assignedReplicas.contains(replica)) {
@@ -65,8 +67,7 @@ class Partition(val topic: String,
}else false
}
- def updateReplicaLEO(replica: Replica, leo: Long) {
- replica.leoUpdateTime = time.milliseconds
+ def updateReplicaLeo(replica: Replica, leo: Long) {
replica.logEndOffset(Some(leo))
debug("Updating the leo to %d for replica %d".format(leo, replica.brokerId))
}
@@ -108,7 +109,7 @@ class Partition(val topic: String,
val possiblyStuckReplicas = inSyncReplicas.filter(r => r.logEndOffset() < leaderReplica().logEndOffset())
info("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId,
possiblyStuckReplicas.map(_.brokerId).mkString(",")))
- val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime() < (time.milliseconds - keepInSyncTimeMs))
+ val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime < (time.milliseconds - keepInSyncTimeMs))
info("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
val leader = leaderReplica()
// Case 2 above
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Wed Jul 25 23:42:07 2012
@@ -18,43 +18,38 @@
package kafka.cluster
import kafka.log.Log
-import kafka.utils.Logging
+import kafka.utils.{SystemTime, Time, Logging}
import kafka.common.KafkaException
class Replica(val brokerId: Int,
val partition: Partition,
val topic: String,
- var log: Option[Log] = None,
- var leoUpdateTime: Long = -1L) extends Logging {
+ time: Time = SystemTime,
+ var hw: Option[Long] = None,
+ var log: Option[Log] = None) extends Logging {
private var logEndOffset: Long = -1L
+ private var logEndOffsetUpdateTimeMs: Long = -1L
def logEndOffset(newLeo: Option[Long] = None): Long = {
isLocal match {
case true =>
newLeo match {
- case Some(newOffset) => throw new KafkaException("Trying to set the leo %d for local log".format(newOffset))
+ case Some(newOffset) => logEndOffsetUpdateTimeMs = time.milliseconds; newOffset
case None => log.get.logEndOffset
}
case false =>
newLeo match {
case Some(newOffset) =>
logEndOffset = newOffset
+ logEndOffsetUpdateTimeMs = time.milliseconds
+ trace("Setting log end offset for replica %d for topic %s partition %d to %d"
+ .format(brokerId, topic, partition.partitionId, logEndOffset))
logEndOffset
case None => logEndOffset
}
}
}
- def logEndOffsetUpdateTime(time: Option[Long] = None): Long = {
- time match {
- case Some(t) =>
- leoUpdateTime = t
- leoUpdateTime
- case None =>
- leoUpdateTime
- }
- }
-
def isLocal: Boolean = {
log match {
case Some(l) => true
@@ -62,6 +57,8 @@ class Replica(val brokerId: Int,
}
}
+ def logEndOffsetUpdateTime = logEndOffsetUpdateTimeMs
+
def highWatermark(highwaterMarkOpt: Option[Long] = None): Long = {
highwaterMarkOpt match {
case Some(highwaterMark) =>
@@ -69,7 +66,7 @@ class Replica(val brokerId: Int,
case true =>
trace("Setting hw for topic %s partition %d on broker %d to %d".format(topic, partition.partitionId,
brokerId, highwaterMark))
- log.get.setHW(highwaterMark)
+ hw = Some(highwaterMark)
highwaterMark
case false => throw new KafkaException("Unable to set highwatermark for topic %s ".format(topic) +
"partition %d on broker %d, since there is no local log for this partition"
@@ -78,7 +75,11 @@ class Replica(val brokerId: Int,
case None =>
isLocal match {
case true =>
- log.get.getHW()
+ hw match {
+ case Some(highWatermarkValue) => highWatermarkValue
+ case None => throw new KafkaException("HighWatermark does not exist for topic %s ".format(topic) +
+ " partition %d on broker %d but local log exists".format(partition.partitionId, brokerId))
+ }
case false => throw new KafkaException("Unable to get highwatermark for topic %s ".format(topic) +
"partition %d on broker %d, since there is no local log for this partition"
.format(partition.partitionId, brokerId))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Wed Jul 25 23:42:07 2012
@@ -93,9 +93,8 @@ private[kafka] class ZookeeperConsumerCo
private var fetcher: Option[ConsumerFetcherManager] = None
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
- // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue
private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
- private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
+ private val scheduler = new KafkaScheduler(1)
private val messageStreamCreated = new AtomicBoolean(false)
private var sessionExpirationListener: ZKSessionExpireListener = null
@@ -121,8 +120,10 @@ private[kafka] class ZookeeperConsumerCo
connectZk()
createFetcher()
if (config.autoCommit) {
+ scheduler.startUp
info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
- scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs)
+ scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs,
+ config.autoCommitIntervalMs, false)
}
def this(config: ConsumerConfig) = this(config, true)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Wed Jul 25 23:42:07 2012
@@ -18,7 +18,7 @@
package kafka.log
import kafka.api.OffsetRequest
-import java.io.{IOException, RandomAccessFile, File}
+import java.io.{IOException, File}
import java.util.{Comparator, Collections, ArrayList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger}
import kafka.utils._
@@ -105,10 +105,8 @@ class LogSegment(val file: File, val mes
* of data to be deleted, we have to compute the offset relative to start of the log segment
* @param offset Absolute offset for this partition
*/
- def truncateUpto(offset: Long) = {
- assert(offset >= start, "Offset %d used for truncating this log segment cannot be smaller than the start offset %d".
- format(offset, start))
- messageSet.truncateUpto(offset - start)
+ def truncateTo(offset: Long) = {
+ messageSet.truncateTo(offset - start)
}
}
@@ -134,13 +132,6 @@ private[kafka] class Log(val dir: File,
/* The actual segments of the log */
private[log] val segments: SegmentList[LogSegment] = loadSegments()
- /* create the leader highwatermark file handle */
- private val hwFile = new RandomAccessFile(dir.getAbsolutePath + "/" + hwFileName, "rw")
- info("Created highwatermark file %s for log %s".format(dir.getAbsolutePath + "/" + hwFileName, name))
-
- /* If hw file is absent, the hw defaults to 0. If it exists, hw is set to the checkpointed value */
- private var hw: Long = if(hwFile.length() > 0) hwFile.readLong() else { hwFile.writeLong(0); 0 }
-
private val logStats = new LogStats(this)
Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
@@ -221,8 +212,6 @@ private[kafka] class Log(val dir: File,
info("Closing log segment " + seg.file.getAbsolutePath)
seg.messageSet.close()
}
- checkpointHW()
- hwFile.close()
}
}
@@ -363,7 +352,6 @@ private[kafka] class Log(val dir: File,
segments.view.last.messageSet.flush()
unflushed.set(0)
lastflushedTime.set(System.currentTimeMillis)
- checkpointHW()
}
}
@@ -435,47 +423,31 @@ private[kafka] class Log(val dir: File,
total
}
- def recoverUptoLastCheckpointedHW() {
- if(hwFile.length() > 0) {
- // read the last checkpointed hw from disk
- hwFile.seek(0)
- val lastKnownHW = hwFile.readLong()
- info("Recovering log %s upto highwatermark %d".format(name, lastKnownHW))
+ def truncateTo(targetOffset: Long) {
// find the log segment that has this hw
val segmentToBeTruncated = segments.view.find(segment =>
- lastKnownHW >= segment.start && lastKnownHW < segment.absoluteEndOffset)
+ targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset)
segmentToBeTruncated match {
case Some(segment) =>
- segment.truncateUpto(lastKnownHW)
- info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, hw))
+ val truncatedSegmentIndex = segments.view.indexOf(segment)
+ segments.truncLast(truncatedSegmentIndex)
+ segment.truncateTo(targetOffset)
+ info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
case None =>
- assert(lastKnownHW <= logEndOffset,
+ assert(targetOffset <= segments.view.last.absoluteEndOffset,
"Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".
- format(lastKnownHW, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
+ format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
- .format(lastKnownHW, segments.view.head.start, logEndOffset))
+ .format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset))
}
- val segmentsToBeDeleted = segments.view.filter(segment => segment.start > lastKnownHW)
+ val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
+ if(segmentsToBeDeleted.size < segments.view.size) {
val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
if(numSegmentsDeleted != segmentsToBeDeleted.size)
error("Failed to delete some segments during log recovery")
- }else
- info("Unable to recover log upto hw. No previously checkpointed high watermark found for " + name)
- }
-
- def setHW(latestLeaderHW: Long) {
- hw = latestLeaderHW
- }
-
- def getHW(): Long = hw
-
- def checkpointHW() {
- hwFile.seek(0)
- hwFile.writeLong(hw)
- hwFile.getChannel.force(true)
- info("Checkpointed highwatermark %d for log %s".format(hw, name))
+ }
}
def topicName():String = {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Wed Jul 25 23:42:07 2012
@@ -30,6 +30,7 @@ import kafka.common.{KafkaException, Inv
*/
@threadsafe
private[kafka] class LogManager(val config: KafkaConfig,
+ scheduler: KafkaScheduler,
private val time: Time,
val logCleanupIntervalMs: Long,
val logCleanupDefaultAgeMs: Long,
@@ -40,11 +41,9 @@ private[kafka] class LogManager(val conf
private val maxSize: Long = config.logFileSize
private val flushInterval = config.flushInterval
private val logCreationLock = new Object
- private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
private val logFlushIntervals = config.flushIntervalMap
private val logRetentionMs = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
private val logRetentionSize = config.logRetentionSize
- private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
/* Initialize a log for each subdirectory of the main log directory */
private val logs = new Pool[String, Pool[Int, Log]]()
@@ -76,17 +75,13 @@ private[kafka] class LogManager(val conf
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
- if(scheduler.hasShutdown) {
- println("Restarting log cleaner scheduler")
- scheduler.startUp
- }
- info("starting log cleaner every " + logCleanupIntervalMs + " ms")
- scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
+ info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
+ scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
+ info("Starting log flusher every " + config.flushSchedulerThreadRate +
+ " ms with the following overrides " + logFlushIntervals)
+ scheduler.scheduleWithRate(flushAllLogs, "kafka-logflusher-",
+ config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
}
-
- if(logFlusherScheduler.hasShutdown) logFlusherScheduler.startUp
- info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervals)
- logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
}
@@ -95,7 +90,7 @@ private[kafka] class LogManager(val conf
*/
private def createLog(topic: String, partition: Int): Log = {
if (topic.length <= 0)
- throw new InvalidTopicException("topic name can't be emtpy")
+ throw new InvalidTopicException("Topic name can't be emtpy")
if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) {
val error = "Wrong partition %d, valid partitions (0, %d)."
.format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
@@ -207,10 +202,8 @@ private[kafka] class LogManager(val conf
/**
* Close all the logs
*/
- def close() {
+ def shutdown() {
info("Closing log manager")
- scheduler.shutdown()
- logFlusherScheduler.shutdown()
allLogs.foreach(_.close())
}
@@ -220,7 +213,7 @@ private[kafka] class LogManager(val conf
def allLogs() = logs.values.flatMap(_.values)
private def flushAllLogs() = {
- debug("flushing the high watermark of all logs")
+ debug("Flushing the high watermark of all logs")
for (log <- allLogs)
{
try{
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala Wed Jul 25 23:42:07 2012
@@ -36,7 +36,7 @@ class LogStats(val log: Log) extends Log
def getNumberOfSegments: Int = log.numberOfSegments
- def getCurrentOffset: Long = log.getHW()
+ def getCurrentOffset: Long = log.logEndOffset
def getNumAppendedMessages: Long = numCumulatedMessages.get
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Wed Jul 25 23:42:07 2012
@@ -196,9 +196,12 @@ class FileMessageSet private[kafka](priv
len - validUpTo
}
- def truncateUpto(hw: Long) = {
- channel.truncate(hw)
- setSize.set(hw)
+ def truncateTo(targetSize: Long) = {
+ if(targetSize >= sizeInBytes())
+ throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) +
+ " size of this log segment is only %d bytes".format(sizeInBytes()))
+ channel.truncate(targetSize)
+ setSize.set(targetSize)
}
/**
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala?rev=1365841&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala Wed Jul 25 23:42:07 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.Logging
+import java.util.concurrent.locks.ReentrantLock
+import java.io._
+
+/**
+ * This class handles the read/write to the highwaterMark checkpoint file. The file stores the high watermark value for
+ * all topics and partitions that this broker hosts. The format of this file is as follows -
+ * version
+ * number of entries
+ * topic partition highwatermark
+ */
+
+object HighwaterMarkCheckpoint {
+ val highWatermarkFileName = ".highwatermark"
+ val currentHighwaterMarkFileVersion = 0
+}
+
+class HighwaterMarkCheckpoint(val path: String) extends Logging {
+ /* create the highwatermark file handle for all partitions */
+ val name = path + "/" + HighwaterMarkCheckpoint.highWatermarkFileName
+ private val hwFile = new File(name)
+ private val hwFileLock = new ReentrantLock()
+ // recover from previous tmp file, if required
+
+ def write(highwaterMarksPerPartition: Map[(String, Int), Long]) {
+ hwFileLock.lock()
+ try {
+ // write to temp file and then swap with the highwatermark file
+ val tempHwFile = new File(hwFile + ".tmp")
+ // it is an error for this file to be present. It could mean that the previous rename operation failed
+ if(tempHwFile.exists()) {
+ fatal("Temporary high watermark %s file exists. This could mean that the ".format(tempHwFile.getAbsolutePath) +
+ "previous high watermark checkpoint operation has failed.")
+ System.exit(1)
+ }
+ val hwFileWriter = new BufferedWriter(new FileWriter(tempHwFile))
+ // checkpoint highwatermark for all partitions
+ // write the current version
+ hwFileWriter.write(HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion.toString)
+ hwFileWriter.newLine()
+ // write the number of entries in the highwatermark file
+ hwFileWriter.write(highwaterMarksPerPartition.size.toString)
+ hwFileWriter.newLine()
+
+ highwaterMarksPerPartition.foreach { partitionAndHw =>
+ val topic = partitionAndHw._1._1
+ val partitionId = partitionAndHw._1._2
+ hwFileWriter.write("%s %s %s".format(topic, partitionId, partitionAndHw._2))
+ hwFileWriter.newLine()
+ }
+ hwFileWriter.flush()
+ hwFileWriter.close()
+ // swap new high watermark file with previous one
+ hwFile.delete()
+ if(!tempHwFile.renameTo(hwFile)) {
+ fatal("Attempt to swap the new high watermark file with the old one failed")
+ System.exit(1)
+ }
+ }finally {
+ hwFileLock.unlock()
+ }
+ }
+
+ def read(topic: String, partition: Int): Long = {
+ hwFileLock.lock()
+ try {
+ hwFile.length() match {
+ case 0 => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
+ "partition %d. Returning 0 as the highwatermark".format(partition))
+ 0L
+ case _ =>
+ val hwFileReader = new BufferedReader(new FileReader(hwFile))
+ val version = hwFileReader.readLine().toShort
+ version match {
+ case HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion =>
+ val numberOfHighWatermarks = hwFileReader.readLine().toInt
+ val partitionHighWatermarks =
+ for(i <- 0 until numberOfHighWatermarks) yield {
+ val nextHwEntry = hwFileReader.readLine()
+ val partitionHwInfo = nextHwEntry.split(" ")
+ val highwaterMark = partitionHwInfo.last.toLong
+ val partitionId = partitionHwInfo.takeRight(2).head
+ // find the index of partition
+ val partitionIndex = nextHwEntry.indexOf(partitionId)
+ val topic = nextHwEntry.substring(0, partitionIndex-1)
+ ((topic, partitionId.toInt) -> highwaterMark)
+ }
+ hwFileReader.close()
+ val hwOpt = partitionHighWatermarks.toMap.get((topic, partition))
+ hwOpt match {
+ case Some(hw) => debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file"
+ .format(hw, topic, partition))
+ hw
+ case None => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
+ "partition %d. Returning 0 as the highwatermark".format(partition))
+ 0L
+ }
+ case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version)
+ System.exit(1)
+ -1L
+ }
+ }
+ }finally {
+ hwFileLock.unlock()
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Jul 25 23:42:07 2012
@@ -171,7 +171,7 @@ class KafkaApis(val requestChannel: Requ
kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
- replicaManager.recordLeaderLogUpdate(topicData.topic, partitionData.partition)
+ replicaManager.recordLeaderLogEndOffset(topicData.topic, partitionData.partition, log.logEndOffset)
offsets(msgIndex) = log.logEndOffset
errors(msgIndex) = ErrorMapping.NoError.toShort
trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Wed Jul 25 23:42:07 2012
@@ -40,11 +40,12 @@ class KafkaServer(val config: KafkaConfi
private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
- private var logManager: LogManager = null
+ var logManager: LogManager = null
var kafkaZookeeper: KafkaZooKeeper = null
- private var replicaManager: ReplicaManager = null
+ var replicaManager: ReplicaManager = null
private var apis: KafkaApis = null
var kafkaController: KafkaController = new KafkaController(config)
+ val kafkaScheduler = new KafkaScheduler(4)
var zkClient: ZkClient = null
/**
@@ -62,9 +63,13 @@ class KafkaServer(val config: KafkaConfi
cleanShutDownFile.delete
}
/* start client */
- info("connecting to ZK: " + config.zkConnect)
+ info("Connecting to ZK: " + config.zkConnect)
zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+ /* start scheduler */
+ kafkaScheduler.startUp
+ /* start log manager */
logManager = new LogManager(config,
+ kafkaScheduler,
SystemTime,
1000L * 60 * config.logCleanupIntervalMinutes,
1000L * 60 * 60 * config.logRetentionHours,
@@ -80,7 +85,7 @@ class KafkaServer(val config: KafkaConfi
kafkaZookeeper = new KafkaZooKeeper(config, zkClient, addReplica, getReplica, makeLeader, makeFollower)
- replicaManager = new ReplicaManager(config, time, zkClient)
+ replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler)
apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
@@ -90,6 +95,9 @@ class KafkaServer(val config: KafkaConfi
// starting relevant replicas and leader election for partitions assigned to this broker
kafkaZookeeper.startup()
+ // start the replica manager
+ replicaManager.startup()
+ // start the controller
kafkaController.startup()
info("Server started.")
@@ -103,23 +111,21 @@ class KafkaServer(val config: KafkaConfi
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
info("Shutting down Kafka server with id " + config.brokerId)
+ kafkaScheduler.shutdown()
apis.close()
if(replicaManager != null)
- replicaManager.close()
+ replicaManager.shutdown()
if (socketServer != null)
socketServer.shutdown()
if(requestHandlerPool != null)
requestHandlerPool.shutdown()
Utils.unregisterMBean(statsMBeanName)
if(logManager != null)
- logManager.close()
+ logManager.shutdown()
if(kafkaController != null)
kafkaController.shutDown()
-
- kafkaZookeeper.close
- info("Closing zookeeper client...")
+ kafkaZookeeper.shutdown()
zkClient.close()
-
val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
cleanShutDownFile.createNewFile
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Wed Jul 25 23:42:07 2012
@@ -104,7 +104,7 @@ class KafkaZooKeeper(config: KafkaConfig
}
}
- def close() {
+ def shutdown() {
stateChangeHandler.shutdown()
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Wed Jul 25 23:42:07 2012
@@ -13,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package kafka.server
import kafka.log.Log
@@ -25,21 +25,28 @@ import java.util.concurrent.locks.Reentr
import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
import kafka.common.{KafkaException, InvalidPartitionException}
-class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) extends Logging {
+class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient, kafkaScheduler: KafkaScheduler)
+ extends Logging {
private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
private var leaderReplicas = new ListBuffer[Partition]()
private val leaderReplicaLock = new ReentrantLock()
- private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true)
private val replicaFetcherManager = new ReplicaFetcherManager(config, this)
+ private val highwaterMarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
+ info("Created highwatermark file %s on broker %d".format(highwaterMarkCheckpoint.name, config.brokerId))
- // start ISR expiration thread
- isrExpirationScheduler.startUp
- isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.replicaMaxLagTimeMs)
+ def startup() {
+ // start the highwatermark checkpoint thread
+ kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0,
+ config.defaultFlushIntervalMs)
+ // start ISR expiration thread
+ kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
+ }
def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
- val localReplica = new Replica(config.brokerId, partition, topic, Some(log))
+ val localReplica = new Replica(config.brokerId, partition, topic, time,
+ Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
val replicaOpt = partition.getReplica(config.brokerId)
replicaOpt match {
@@ -81,12 +88,12 @@ class ReplicaManager(val config: KafkaCo
case Some(partition) => partition
case None =>
throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d"
- .format(topic, partitionId, config.brokerId))
+ .format(topic, partitionId, config.brokerId))
}
}
def addRemoteReplica(topic: String, partitionId: Int, replicaId: Int, partition: Partition): Replica = {
- val remoteReplica = new Replica(replicaId, partition, topic)
+ val remoteReplica = new Replica(replicaId, partition, topic, time)
val replicaAdded = partition.addReplica(remoteReplica)
if(replicaAdded)
@@ -112,20 +119,17 @@ class ReplicaManager(val config: KafkaCo
Some(replicas.leaderReplica())
case None =>
throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " +
- "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
+ "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
}
}
- def getPartition(topic: String, partitionId: Int): Option[Partition] =
- allReplicas.get((topic, partitionId))
-
- def updateReplicaLEO(replica: Replica, fetchOffset: Long) {
+ private def updateReplicaLeo(replica: Replica, fetchOffset: Long) {
// set the replica leo
val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
- partition.updateReplicaLEO(replica, fetchOffset)
+ partition.updateReplicaLeo(replica, fetchOffset)
}
- def maybeIncrementLeaderHW(replica: Replica) {
+ private def maybeIncrementLeaderHW(replica: Replica) {
// set the replica leo
val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
// set the leader HW to min of the leo of all replicas
@@ -141,58 +145,70 @@ class ReplicaManager(val config: KafkaCo
}
def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
- // read and cache the ISR
- replica.partition.leaderId(Some(replica.brokerId))
- replica.partition.updateISR(currentISRInZk.toSet)
- // stop replica fetcher thread, if any
- replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
- // also add this partition to the list of partitions for which the leader is the current broker
+ info("Broker %d started the leader state transition for topic %s partition %d"
+ .format(config.brokerId, replica.topic, replica.partition.partitionId))
try {
+ // read and cache the ISR
+ replica.partition.leaderId(Some(replica.brokerId))
+ replica.partition.updateISR(currentISRInZk.toSet)
+ // stop replica fetcher thread, if any
+ replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
+ // also add this partition to the list of partitions for which the leader is the current broker
leaderReplicaLock.lock()
leaderReplicas += replica.partition
+ info("Broker %d completed the leader state transition for topic %s partition %d"
+ .format(config.brokerId, replica.topic, replica.partition.partitionId))
+ }catch {
+ case e => error("Broker %d failed to complete the leader state transition for topic %s partition %d"
+ .format(config.brokerId, replica.topic, replica.partition.partitionId), e)
}finally {
leaderReplicaLock.unlock()
}
}
def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
- info("broker %d intending to follow leader %d for topic %s partition %d"
+ info("Broker %d starting the follower state transition to follow leader %d for topic %s partition %d"
.format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
- // set the leader for this partition correctly on this broker
- replica.partition.leaderId(Some(leaderBrokerId))
- // remove this replica's partition from the ISR expiration queue
try {
+ // set the leader for this partition correctly on this broker
+ replica.partition.leaderId(Some(leaderBrokerId))
+ replica.log match {
+ case Some(log) => // log is already started
+ log.truncateTo(replica.highWatermark())
+ case None =>
+ }
+ // get leader for this replica
+ val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
+ val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
+ // become follower only if it is not already following the same leader
+ if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
+ info("broker %d becoming follower to leader %d for topic %s partition %d"
+ .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+ // stop fetcher thread to previous leader
+ replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
+ // start fetcher thread to current leader
+ replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker)
+ }
+ // remove this replica's partition from the ISR expiration queue
leaderReplicaLock.lock()
leaderReplicas -= replica.partition
+ info("Broker %d completed the follower state transition to follow leader %d for topic %s partition %d"
+ .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+ }catch {
+ case e => error("Broker %d failed to complete the follower state transition to follow leader %d for topic %s partition %d"
+ .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId), e)
}finally {
leaderReplicaLock.unlock()
}
- replica.log match {
- case Some(log) => // log is already started
- log.recoverUptoLastCheckpointedHW()
- case None =>
- }
- // get leader for this replica
- val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
- val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
- // Become follower only if it is not already following the same leader
- if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
- info("broker %d becoming follower to leader %d for topic %s partition %d"
- .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
- // stop fetcher thread to previous leader
- replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
- // start fetcher thread to current leader
- replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker)
- }
}
- def maybeShrinkISR(): Unit = {
+ private def maybeShrinkISR(): Unit = {
try {
info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
.format(config.replicaMaxLagTimeMs))
leaderReplicaLock.lock()
leaderReplicas.foreach { partition =>
- // shrink ISR if a follower is slow or stuck
+ // shrink ISR if a follower is slow or stuck
val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)
if(outOfSyncReplicas.size > 0) {
val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
@@ -210,7 +226,7 @@ class ReplicaManager(val config: KafkaCo
}
}
- def checkIfISRCanBeExpanded(replica: Replica): Boolean = {
+ private def checkIfISRCanBeExpanded(replica: Replica): Boolean = {
val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
if(partition.inSyncReplicas.contains(replica)) false
else if(partition.assignedReplicas().contains(replica)) {
@@ -225,7 +241,7 @@ class ReplicaManager(val config: KafkaCo
val replicaOpt = getReplica(topic, partition, replicaId)
replicaOpt match {
case Some(replica) =>
- updateReplicaLEO(replica, offset)
+ updateReplicaLeo(replica, offset)
// check if this replica needs to be added to the ISR
if(checkIfISRCanBeExpanded(replica)) {
val newISR = replica.partition.inSyncReplicas + replica
@@ -239,20 +255,45 @@ class ReplicaManager(val config: KafkaCo
}
}
- def recordLeaderLogUpdate(topic: String, partition: Int) = {
+ def recordLeaderLogEndOffset(topic: String, partition: Int, logEndOffset: Long) = {
val replicaOpt = getReplica(topic, partition, config.brokerId)
replicaOpt match {
- case Some(replica) =>
- replica.logEndOffsetUpdateTime(Some(time.milliseconds))
+ case Some(replica) => replica.logEndOffset(Some(logEndOffset))
case None =>
throw new KafkaException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId))
}
}
- def close() {
- info("Closing replica manager on broker " + config.brokerId)
- isrExpirationScheduler.shutdown()
+ /**
+ * Flushes the highwatermark value for all partitions to the highwatermark file
+ */
+ private def checkpointHighwaterMarks() {
+ val highwaterMarksForAllPartitions = allReplicas.map { partition =>
+ val topic = partition._1._1
+ val partitionId = partition._1._2
+ val localReplicaOpt = partition._2.getReplica(config.brokerId)
+ val hw = localReplicaOpt match {
+ case Some(localReplica) => localReplica.highWatermark()
+ case None =>
+ error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) +
+ " Replica metadata doesn't exist in replica manager on broker " + config.brokerId)
+ 0L
+ }
+ (topic, partitionId) -> hw
+ }.toMap
+ highwaterMarkCheckpoint.write(highwaterMarksForAllPartitions)
+ info("Checkpointed highwatermarks")
+ }
+
+ /**
+ * Reads the checkpointed highWatermarks for all partitions
+ * @returns checkpointed value of highwatermark for topic, partition. If one doesn't exist, returns 0
+ */
+ def readCheckpointedHighWatermark(topic: String, partition: Int): Long = highwaterMarkCheckpoint.read(topic, partition)
+
+ def shutdown() {
replicaFetcherManager.shutdown()
+ checkpointHighwaterMarks()
info("Replica manager shutdown on broker " + config.brokerId)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala Wed Jul 25 23:42:07 2012
@@ -17,7 +17,6 @@
package kafka.utils
-import kafka.common.KafkaException
import java.lang.IllegalStateException
class State
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala Wed Jul 25 23:42:07 2012
@@ -18,20 +18,24 @@
package kafka.utils
import java.util.concurrent._
-import java.util.concurrent.atomic._
+import atomic._
+import collection.mutable.HashMap
/**
* A scheduler for running jobs in the background
*/
-class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) extends Logging {
- private val threadId = new AtomicLong(0)
+class KafkaScheduler(val numThreads: Int) extends Logging {
private var executor:ScheduledThreadPoolExecutor = null
- startUp
+ private val daemonThreadFactory = new ThreadFactory() {
+ def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, true)
+ }
+ private val nonDaemonThreadFactory = new ThreadFactory() {
+ def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, false)
+ }
+ private val threadNamesAndIds = new HashMap[String, AtomicInteger]()
def startUp = {
- executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
- def newThread(runnable: Runnable): Thread = Utils.daemonThread(baseThreadName + threadId.getAndIncrement, runnable)
- })
+ executor = new ScheduledThreadPoolExecutor(numThreads)
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
}
@@ -43,20 +47,26 @@ class KafkaScheduler(val numThreads: Int
throw new IllegalStateException("Kafka scheduler has not been started")
}
- def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) = {
+ def scheduleWithRate(fun: () => Unit, name: String, delayMs: Long, periodMs: Long, isDaemon: Boolean = true) = {
ensureExecutorHasStarted
- executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
+ if(isDaemon)
+ executor.setThreadFactory(daemonThreadFactory)
+ else
+ executor.setThreadFactory(nonDaemonThreadFactory)
+ val threadId = threadNamesAndIds.getOrElseUpdate(name, new AtomicInteger(0))
+ executor.scheduleAtFixedRate(Utils.loggedRunnable(fun, name + threadId.incrementAndGet()), delayMs, periodMs,
+ TimeUnit.MILLISECONDS)
}
def shutdownNow() {
ensureExecutorHasStarted
executor.shutdownNow()
- info("Forcing shutdown of scheduler " + baseThreadName)
+ info("Forcing shutdown of Kafka scheduler")
}
def shutdown() {
ensureExecutorHasStarted
executor.shutdown()
- info("Shutdown scheduler " + baseThreadName)
+ info("Shutdown Kafka scheduler")
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Wed Jul 25 23:42:07 2012
@@ -58,9 +58,10 @@ object Utils extends Logging {
* @param fun A function
* @return A Runnable that just executes the function
*/
- def loggedRunnable(fun: () => Unit): Runnable =
+ def loggedRunnable(fun: () => Unit, name: String): Runnable =
new Runnable() {
def run() = {
+ Thread.currentThread().setName(name)
try {
fun()
}
@@ -74,6 +75,14 @@ object Utils extends Logging {
/**
* Create a daemon thread
+ * @param runnable The runnable to execute in the background
+ * @return The unstarted thread
+ */
+ def daemonThread(runnable: Runnable): Thread =
+ newThread(runnable, true)
+
+ /**
+ * Create a daemon thread
* @param name The name of the thread
* @param runnable The runnable to execute in the background
* @return The unstarted thread
@@ -109,6 +118,23 @@ object Utils extends Logging {
}
/**
+ * Create a new thread
+ * @param runnable The work for the thread to do
+ * @param daemon Should the thread block JVM shutdown?
+ * @return The unstarted thread
+ */
+ def newThread(runnable: Runnable, daemon: Boolean): Thread = {
+ val thread = new Thread(runnable)
+ thread.setDaemon(daemon)
+ thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ def uncaughtException(t: Thread, e: Throwable) {
+ error("Uncaught exception in thread '" + t.getName + "':", e)
+ }
+ })
+ thread
+ }
+
+ /**
* Read a byte array from the given offset and size in the buffer
* TODO: Should use System.arraycopy
*/
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Wed Jul 25 23:42:07 2012
@@ -37,6 +37,7 @@ class LogManagerTest extends JUnit3Suite
val zookeeperConnect = TestZKUtils.zookeeperConnect
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
+ val scheduler = new KafkaScheduler(2)
override def setUp() {
super.setUp()
@@ -45,7 +46,8 @@ class LogManagerTest extends JUnit3Suite
override val logFileSize = 1024
override val flushInterval = 100
}
- logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
+ scheduler.startUp
+ logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge, false)
logManager.startup
logDir = logManager.logDir
@@ -56,8 +58,9 @@ class LogManagerTest extends JUnit3Suite
}
override def tearDown() {
+ scheduler.shutdown()
if(logManager != null)
- logManager.close()
+ logManager.shutdown()
Utils.rm(logDir)
super.tearDown()
}
@@ -114,7 +117,7 @@ class LogManagerTest extends JUnit3Suite
val retentionHours = 1
val retentionMs = 1000 * 60 * 60 * retentionHours
val props = TestUtils.createBrokerConfig(0, -1)
- logManager.close
+ logManager.shutdown()
Thread.sleep(100)
config = new KafkaConfig(props) {
override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
@@ -122,7 +125,7 @@ class LogManagerTest extends JUnit3Suite
override val logRetentionHours = retentionHours
override val flushInterval = 100
}
- logManager = new LogManager(config, time, veryLargeLogFlushInterval, retentionMs, false)
+ logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, retentionMs, false)
logManager.startup
// create a log
@@ -159,7 +162,7 @@ class LogManagerTest extends JUnit3Suite
@Test
def testTimeBasedFlush() {
val props = TestUtils.createBrokerConfig(0, -1)
- logManager.close
+ logManager.shutdown()
Thread.sleep(100)
config = new KafkaConfig(props) {
override val logFileSize = 1024 *1024 *1024
@@ -167,7 +170,7 @@ class LogManagerTest extends JUnit3Suite
override val flushInterval = Int.MaxValue
override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
}
- logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
+ logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge, false)
logManager.startup
val log = logManager.getOrCreateLog(name, 0)
for(i <- 0 until 200) {
@@ -182,15 +185,14 @@ class LogManagerTest extends JUnit3Suite
@Test
def testConfigurablePartitions() {
val props = TestUtils.createBrokerConfig(0, -1)
- logManager.close
+ logManager.shutdown()
Thread.sleep(100)
config = new KafkaConfig(props) {
override val logFileSize = 256
override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")
override val flushInterval = 100
}
-
- logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
+ logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge, false)
logManager.startup
for(i <- 0 until 1) {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Wed Jul 25 23:42:07 2012
@@ -13,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package kafka.producer
@@ -35,7 +35,7 @@ import java.util
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
private val brokerId1 = 0
- private val brokerId2 = 1
+ private val brokerId2 = 1
private val ports = TestUtils.choosePorts(2)
private val (port1, port2) = (ports(0), ports(1))
private var server1: KafkaServer = null
@@ -80,7 +80,7 @@ class ProducerTest extends JUnit3Suite w
server2.shutdown
server2.awaitShutdown()
Utils.rm(server1.config.logDir)
- Utils.rm(server2.config.logDir)
+ Utils.rm(server2.config.logDir)
Thread.sleep(500)
super.tearDown()
}
@@ -97,7 +97,7 @@ class ProducerTest extends JUnit3Suite w
val props2 = new util.Properties()
props2.putAll(props1)
props2.put("producer.request.required.acks", "3")
- props2.put("producer.request.ack.timeout.ms", "1000")
+ props2.put("producer.request.timeout.ms", "1000")
val config1 = new ProducerConfig(props1)
val config2 = new ProducerConfig(props2)
@@ -108,33 +108,28 @@ class ProducerTest extends JUnit3Suite w
val producer1 = new Producer[String, String](config1)
val producer2 = new Producer[String, String](config2)
- try {
- // Available partition ids should be 0.
- producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
- producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
- // get the leader
- val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
- assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
- val leader = leaderOpt.get
-
- val messageSet = if(leader == server1.config.brokerId) {
- val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
- response1.messageSet("new-topic", 0).iterator
- }else {
- val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
- response2.messageSet("new-topic", 0).iterator
- }
- assertTrue("Message set should have 1 message", messageSet.hasNext)
-
- assertEquals(new Message("test1".getBytes), messageSet.next.message)
- assertTrue("Message set should have 1 message", messageSet.hasNext)
- assertEquals(new Message("test1".getBytes), messageSet.next.message)
- assertFalse("Message set should not have any more messages", messageSet.hasNext)
- } catch {
- case e: Exception => fail("Not expected", e)
- } finally {
- producer1.close()
- }
+ // Available partition ids should be 0.
+ producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+ producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+ // get the leader
+ val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
+ assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
+ val leader = leaderOpt.get
+
+ val messageSet = if(leader == server1.config.brokerId) {
+ val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+ response1.messageSet("new-topic", 0).iterator
+ }else {
+ val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+ response2.messageSet("new-topic", 0).iterator
+ }
+ assertTrue("Message set should have 1 message", messageSet.hasNext)
+
+ assertEquals(new Message("test1".getBytes), messageSet.next.message)
+ assertTrue("Message set should have 1 message", messageSet.hasNext)
+ assertEquals(new Message("test1".getBytes), messageSet.next.message)
+ assertFalse("Message set should not have any more messages", messageSet.hasNext)
+ producer1.close()
try {
producer2.send(new ProducerData[String, String]("new-topic", "test", Array("test2")))
Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1365841&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Wed Jul 25 23:42:07 2012
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.server
+
+import kafka.log.Log
+import org.I0Itec.zkclient.ZkClient
+import org.scalatest.junit.JUnit3Suite
+import org.easymock.EasyMock
+import org.junit.Assert._
+import kafka.utils.{KafkaScheduler, TestUtils, MockTime}
+import kafka.common.KafkaException
+
+class HighwatermarkPersistenceTest extends JUnit3Suite {
+
+ val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+ override val defaultFlushIntervalMs = 100
+ })
+ val topic = "foo"
+
+ def testHighWatermarkPersistenceSinglePartition() {
+ // mock zkclient
+ val zkClient = EasyMock.createMock(classOf[ZkClient])
+ EasyMock.replay(zkClient)
+ // create kafka scheduler
+ val scheduler = new KafkaScheduler(2)
+ scheduler.startUp
+ // create replica manager
+ val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
+ replicaManager.startup()
+ // sleep until flush ms
+ Thread.sleep(configs.head.defaultFlushIntervalMs)
+ var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
+ assertEquals(0L, fooPartition0Hw)
+ val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
+ // create leader log
+ val log0 = getMockLog
+ // create leader and follower replicas
+ val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet)
+ val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0)
+ // sleep until flush ms
+ Thread.sleep(configs.head.defaultFlushIntervalMs)
+ fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
+ assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
+ try {
+ followerReplicaPartition0.highWatermark()
+ fail("Should fail with IllegalStateException")
+ }catch {
+ case e: KafkaException => // this is ok
+ }
+ // set the leader
+ partition0.leaderId(Some(leaderReplicaPartition0.brokerId))
+ // set the highwatermark for local replica
+ partition0.leaderHW(Some(5L))
+ // sleep until flush interval
+ Thread.sleep(configs.head.defaultFlushIntervalMs)
+ fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
+ assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
+ EasyMock.verify(zkClient)
+ EasyMock.verify(log0)
+ }
+
+ def testHighWatermarkPersistenceMultiplePartitions() {
+ val topic1 = "foo1"
+ val topic2 = "foo2"
+ // mock zkclient
+ val zkClient = EasyMock.createMock(classOf[ZkClient])
+ EasyMock.replay(zkClient)
+ // create kafka scheduler
+ val scheduler = new KafkaScheduler(2)
+ scheduler.startUp
+ // create replica manager
+ val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
+ replicaManager.startup()
+ // sleep until flush ms
+ Thread.sleep(configs.head.defaultFlushIntervalMs)
+ var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+ assertEquals(0L, topic1Partition0Hw)
+ val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, configs.map(_.brokerId).toSet)
+ // create leader log
+ val topic1Log0 = getMockLog
+ // create leader and follower replicas
+ val leaderReplicaTopic1Partition0 = replicaManager.addLocalReplica(topic1, 0, topic1Log0, configs.map(_.brokerId).toSet)
+ // sleep until flush ms
+ Thread.sleep(configs.head.defaultFlushIntervalMs)
+ topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+ assertEquals(leaderReplicaTopic1Partition0.highWatermark(), topic1Partition0Hw)
+ // set the leader
+ topic1Partition0.leaderId(Some(leaderReplicaTopic1Partition0.brokerId))
+ // set the highwatermark for local replica
+ topic1Partition0.leaderHW(Some(5L))
+ // sleep until flush interval
+ Thread.sleep(configs.head.defaultFlushIntervalMs)
+ topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+ assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark())
+ assertEquals(5L, topic1Partition0Hw)
+ // add another partition and set highwatermark
+ val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, configs.map(_.brokerId).toSet)
+ // create leader log
+ val topic2Log0 = getMockLog
+ // create leader and follower replicas
+ val leaderReplicaTopic2Partition0 = replicaManager.addLocalReplica(topic2, 0, topic2Log0, configs.map(_.brokerId).toSet)
+ // sleep until flush ms
+ Thread.sleep(configs.head.defaultFlushIntervalMs)
+ var topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
+ assertEquals(leaderReplicaTopic2Partition0.highWatermark(), topic2Partition0Hw)
+ // set the leader
+ topic2Partition0.leaderId(Some(leaderReplicaTopic2Partition0.brokerId))
+ // set the highwatermark for local replica
+ topic2Partition0.leaderHW(Some(15L))
+ assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark())
+ // change the highwatermark for topic1
+ topic1Partition0.leaderHW(Some(10L))
+ assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark())
+ // sleep until flush interval
+ Thread.sleep(configs.head.defaultFlushIntervalMs)
+ // verify checkpointed hw for topic 2
+ topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
+ assertEquals(15L, topic2Partition0Hw)
+ // verify checkpointed hw for topic 1
+ topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+ assertEquals(10L, topic1Partition0Hw)
+ EasyMock.verify(zkClient)
+ EasyMock.verify(topic1Log0)
+ EasyMock.verify(topic2Log0)
+ }
+
+ private def getMockLog: Log = {
+ val log = EasyMock.createMock(classOf[kafka.log.Log])
+ EasyMock.replay(log)
+ log
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Wed Jul 25 23:42:07 2012
@@ -22,9 +22,9 @@ import collection.mutable.Map
import kafka.cluster.{Partition, Replica}
import org.easymock.EasyMock
import kafka.log.Log
-import kafka.utils.{Time, MockTime, TestUtils}
import org.junit.Assert._
import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{KafkaScheduler, Time, MockTime, TestUtils}
class ISRExpirationTest extends JUnit3Suite {
@@ -40,7 +40,6 @@ class ISRExpirationTest extends JUnit3Su
// create leader replica
val log = EasyMock.createMock(classOf[kafka.log.Log])
EasyMock.expect(log.logEndOffset).andReturn(5L).times(12)
- EasyMock.expect(log.setHW(5L)).times(1)
EasyMock.replay(log)
// add one partition
@@ -48,10 +47,10 @@ class ISRExpirationTest extends JUnit3Su
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
val leaderReplica = partition0.getReplica(configs.head.brokerId).get
// set remote replicas leo to something low, like 2
- (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 2))
+ (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 2))
time.sleep(150)
- leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+ leaderReplica.logEndOffset(Some(5L))
var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
@@ -60,9 +59,9 @@ class ISRExpirationTest extends JUnit3Su
partition0.inSyncReplicas ++= partition0.assignedReplicas()
assertEquals("Replica 1 should be in sync", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
- leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+ leaderReplica.logEndOffset(Some(5L))
// let the follower catch up only upto 3
- (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 3))
+ (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 3))
time.sleep(150)
// now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't
// pulled any data for > replicaMaxLagTimeMs ms. So it is stuck
@@ -80,12 +79,12 @@ class ISRExpirationTest extends JUnit3Su
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
val leaderReplica = partition0.getReplica(configs.head.brokerId).get
// set remote replicas leo to something low, like 4
- (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 4))
+ (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 4))
time.sleep(150)
- leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+ leaderReplica.logEndOffset(Some(15L))
time.sleep(10)
- (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
+ (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffset(Some(4)))
val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
@@ -98,8 +97,10 @@ class ISRExpirationTest extends JUnit3Su
// mock zkclient
val zkClient = EasyMock.createMock(classOf[ZkClient])
EasyMock.replay(zkClient)
+ // create kafka scheduler
+ val scheduler = new KafkaScheduler(2)
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, time, zkClient)
+ val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler)
try {
val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
// create leader log
@@ -115,7 +116,7 @@ class ISRExpirationTest extends JUnit3Su
partition0.leaderHW(Some(5L))
// set the leo for non-leader replicas to something low
- (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLEO(r, 2))
+ (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLeo(r, 2))
val log1 = getLogWithHW(15L)
// create leader and follower replicas for partition 1
@@ -129,13 +130,13 @@ class ISRExpirationTest extends JUnit3Su
partition1.leaderHW(Some(15L))
// set the leo for non-leader replicas to something low
- (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLEO(r, 4))
+ (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLeo(r, 4))
time.sleep(150)
- leaderReplicaPartition0.logEndOffsetUpdateTime(Some(time.milliseconds))
- leaderReplicaPartition1.logEndOffsetUpdateTime(Some(time.milliseconds))
+ leaderReplicaPartition0.logEndOffset(Some(4L))
+ leaderReplicaPartition1.logEndOffset(Some(4L))
time.sleep(10)
- (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
+ (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffset(Some(4L)))
val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
@@ -148,16 +149,16 @@ class ISRExpirationTest extends JUnit3Su
}catch {
case e => e.printStackTrace()
}finally {
- replicaManager.close()
+ replicaManager.shutdown()
}
}
private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
localLog: Log, leaderHW: Long): Partition = {
val partition = new Partition(topic, partitionId, time)
- val leaderReplica = new Replica(leaderId, partition, topic, Some(localLog))
+ val leaderReplica = new Replica(leaderId, partition, topic, time, Some(leaderHW), Some(localLog))
- val allReplicas = getFollowerReplicas(partition, leaderId) :+ leaderReplica
+ val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
partition.assignedReplicas(Some(allReplicas.toSet))
// set in sync replicas for this partition to all the assigned replicas
partition.inSyncReplicas = allReplicas.toSet
@@ -170,15 +171,14 @@ class ISRExpirationTest extends JUnit3Su
private def getLogWithHW(hw: Long): Log = {
val log1 = EasyMock.createMock(classOf[kafka.log.Log])
EasyMock.expect(log1.logEndOffset).andReturn(hw).times(6)
- EasyMock.expect(log1.setHW(hw)).times(1)
EasyMock.replay(log1)
log1
}
- private def getFollowerReplicas(partition: Partition, leaderId: Int): Seq[Replica] = {
+ private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {
configs.filter(_.brokerId != leaderId).map { config =>
- new Replica(config.brokerId, partition, topic)
+ new Replica(config.brokerId, partition, topic, time)
}
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Wed Jul 25 23:42:07 2012
@@ -7,7 +7,6 @@ import kafka.utils.TestUtils._
import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import kafka.message.Message
-import java.io.RandomAccessFile
import kafka.producer.{ProducerConfig, ProducerData, Producer}
import org.junit.Test
@@ -34,15 +33,12 @@ class LogRecoveryTest extends JUnit3Suit
val configProps1 = configs.head
val configProps2 = configs.last
- val server1HWFile = configProps1.logDir + "/" + topic + "-0/highwatermark"
- val server2HWFile = configProps2.logDir + "/" + topic + "-0/highwatermark"
-
val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
var producer: Producer[Int, Message] = null
- var hwFile1: RandomAccessFile = null
- var hwFile2: RandomAccessFile = null
+ var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDir)
+ var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDir)
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
@Test
@@ -66,22 +62,15 @@ class LogRecoveryTest extends JUnit3Suit
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
- sendMessages()
-
- hwFile1 = new RandomAccessFile(server1HWFile, "r")
- hwFile2 = new RandomAccessFile(server2HWFile, "r")
-
- sendMessages()
+ sendMessages(2)
// don't wait for follower to read the leader's hw
// shutdown the servers to allow the hw to be checkpointed
servers.map(server => server.shutdown())
producer.close()
- val leaderHW = readHW(hwFile1)
+ val leaderHW = hwFile1.read(topic, 0)
assertEquals(60L, leaderHW)
- val followerHW = readHW(hwFile2)
+ val followerHW = hwFile2.read(topic, 0)
assertEquals(30L, followerHW)
- hwFile1.close()
- hwFile2.close()
servers.map(server => Utils.rm(server.config.logDir))
}
@@ -105,16 +94,13 @@ class LogRecoveryTest extends JUnit3Suit
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
- hwFile1 = new RandomAccessFile(server1HWFile, "r")
- hwFile2 = new RandomAccessFile(server2HWFile, "r")
-
- assertEquals(0L, readHW(hwFile1))
+ assertEquals(0L, hwFile1.read(topic, 0))
sendMessages()
// kill the server hosting the preferred replica
server1.shutdown()
- assertEquals(30L, readHW(hwFile1))
+ assertEquals(30L, hwFile1.read(topic, 0))
// check if leader moves to the other server
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
@@ -126,10 +112,10 @@ class LogRecoveryTest extends JUnit3Suit
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
- assertEquals(30L, readHW(hwFile1))
+ assertEquals(30L, hwFile1.read(topic, 0))
// since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet
server2.shutdown()
- assertEquals(30L, readHW(hwFile2))
+ assertEquals(30L, hwFile2.read(topic, 0))
server2.startup()
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
@@ -137,17 +123,13 @@ class LogRecoveryTest extends JUnit3Suit
sendMessages()
// give some time for follower 1 to record leader HW of 60
- Thread.sleep(500)
+ TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 60L, 500)
+
// shutdown the servers to allow the hw to be checkpointed
servers.map(server => server.shutdown())
- Thread.sleep(200)
producer.close()
- assert(hwFile1.length() > 0)
- assert(hwFile2.length() > 0)
- assertEquals(60L, readHW(hwFile1))
- assertEquals(60L, readHW(hwFile2))
- hwFile1.close()
- hwFile2.close()
+ assertEquals(60L, hwFile1.read(topic, 0))
+ assertEquals(60L, hwFile2.read(topic, 0))
servers.map(server => Utils.rm(server.config.logDir))
}
@@ -160,14 +142,14 @@ class LogRecoveryTest extends JUnit3Suit
override val logFileSize = 30
})
- val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark"
- val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark"
-
// start both servers
server1 = TestUtils.createServer(configs.head)
server2 = TestUtils.createServer(configs.last)
servers ++= List(server1, server2)
+ hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
+ hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
+
val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
@@ -181,25 +163,16 @@ class LogRecoveryTest extends JUnit3Suit
assertTrue("Leader should get elected", leader.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
-
- sendMessages(10)
-
- hwFile1 = new RandomAccessFile(server1HWFile, "r")
- hwFile2 = new RandomAccessFile(server2HWFile, "r")
-
- sendMessages(10)
-
+ sendMessages(20)
// give some time for follower 1 to record leader HW of 600
- Thread.sleep(500)
+ TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 600L, 500)
// shutdown the servers to allow the hw to be checkpointed
servers.map(server => server.shutdown())
producer.close()
- val leaderHW = readHW(hwFile1)
+ val leaderHW = hwFile1.read(topic, 0)
assertEquals(600L, leaderHW)
- val followerHW = readHW(hwFile2)
+ val followerHW = hwFile2.read(topic, 0)
assertEquals(600L, followerHW)
- hwFile1.close()
- hwFile2.close()
servers.map(server => Utils.rm(server.config.logDir))
}
@@ -208,19 +181,18 @@ class LogRecoveryTest extends JUnit3Suit
override val replicaMaxLagTimeMs = 5000L
override val replicaMaxLagBytes = 10L
override val flushInterval = 1000
- override val flushSchedulerThreadRate = 10
override val replicaMinBytes = 20
override val logFileSize = 30
})
- val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark"
- val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark"
-
// start both servers
server1 = TestUtils.createServer(configs.head)
server2 = TestUtils.createServer(configs.last)
servers ++= List(server1, server2)
+ hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
+ hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
+
val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
@@ -235,43 +207,36 @@ class LogRecoveryTest extends JUnit3Suit
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
- val hwFile1 = new RandomAccessFile(server1HWFile, "r")
- val hwFile2 = new RandomAccessFile(server2HWFile, "r")
-
sendMessages(2)
// allow some time for the follower to get the leader HW
- Thread.sleep(1000)
+ TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000)
// kill the server hosting the preferred replica
server1.shutdown()
server2.shutdown()
- assertEquals(60L, readHW(hwFile1))
- assertEquals(60L, readHW(hwFile2))
+ assertEquals(60L, hwFile1.read(topic, 0))
+ assertEquals(60L, hwFile2.read(topic, 0))
server2.startup()
// check if leader moves to the other server
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
- assertEquals(60L, readHW(hwFile1))
+ assertEquals(60L, hwFile1.read(topic, 0))
// bring the preferred replica back
server1.startup()
- assertEquals(60L, readHW(hwFile1))
- assertEquals(60L, readHW(hwFile2))
+ assertEquals(60L, hwFile1.read(topic, 0))
+ assertEquals(60L, hwFile2.read(topic, 0))
sendMessages(2)
// allow some time for the follower to get the leader HW
- Thread.sleep(1000)
+ TestUtils.waitUntilTrue(() => server1.getReplica(topic, 0).get.highWatermark() == 120L, 1000)
// shutdown the servers to allow the hw to be checkpointed
servers.map(server => server.shutdown())
producer.close()
- assert(hwFile1.length() > 0)
- assert(hwFile2.length() > 0)
- assertEquals(120L, readHW(hwFile1))
- assertEquals(120L, readHW(hwFile2))
- hwFile1.close()
- hwFile2.close()
+ assertEquals(120L, hwFile1.read(topic, 0))
+ assertEquals(120L, hwFile2.read(topic, 0))
servers.map(server => Utils.rm(server.config.logDir))
}
@@ -280,9 +245,4 @@ class LogRecoveryTest extends JUnit3Suit
producer.send(new ProducerData[Int, Message](topic, 0, sent1))
}
}
-
- private def readHW(hwFile: RandomAccessFile): Long = {
- hwFile.seek(0)
- hwFile.readLong()
- }
}
\ No newline at end of file