You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/11/02 20:01:39 UTC
svn commit: r1405102 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/cluster/ main/scala/kafka/log/ main/scala/kafka/server/
main/scala/kafka/utils/ test/scala/unit/kafka/integration/
test/scala/unit/kafka/log/ test/scala/unit/kafka/produ...
Author: jkreps
Date: Fri Nov 2 19:01:38 2012
New Revision: 1405102
URL: http://svn.apache.org/viewvc?rev=1405102&view=rev
Log:
KAFKA-188 Support multiple data directories.
Patch reviewed by Neha and Jun.
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/FileLock.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Fri Nov 2 19:01:38 2012
@@ -37,7 +37,6 @@ class Partition(val topic: String,
private val localBrokerId = replicaManager.config.brokerId
private val logManager = replicaManager.logManager
private val replicaFetcherManager = replicaManager.replicaFetcherManager
- private val highwaterMarkCheckpoint = replicaManager.highWatermarkCheckpoint
private val zkClient = replicaManager.zkClient
var leaderReplicaIdOpt: Option[Int] = None
var inSyncReplicas: Set[Replica] = Set.empty[Replica]
@@ -69,8 +68,8 @@ class Partition(val topic: String,
case None =>
if (isReplicaLocal(replicaId)) {
val log = logManager.getOrCreateLog(topic, partitionId)
- val localReplica = new Replica(replicaId, this, time,
- highwaterMarkCheckpoint.read(topic, partitionId).min(log.logEndOffset), Some(log))
+ val offset = replicaManager.highWatermarkCheckpoints(log.dir.getParent).read(topic, partitionId).min(log.logEndOffset)
+ val localReplica = new Replica(replicaId, this, time, offset, Some(log))
addReplicaIfNotExists(localReplica)
}
else {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Fri Nov 2 19:01:38 2012
@@ -26,61 +26,107 @@ import kafka.server.{HighwaterMarkCheckp
/**
- * The guy who creates and hands out logs
+ * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
+ * All read and write operations are delegated to the individual log instances.
+ *
+ * The log manager maintains logs in one or more directories. New logs are created in the data directory
+ * with the fewest logs. No attempt is made to move partitions after the fact or balance based on
+ * size or I/O rate.
+ *
+ * A background thread handles log retention by periodically truncating excess log segments.
*/
@threadsafe
private[kafka] class LogManager(val config: KafkaConfig,
scheduler: KafkaScheduler,
- private val time: Time,
- val logRollDefaultIntervalMs: Long,
- val logCleanupIntervalMs: Long,
- val logCleanupDefaultAgeMs: Long,
- needRecovery: Boolean) extends Logging {
+ private val time: Time) extends Logging {
- val logDir: File = new File(config.logDir)
+ val CleanShutdownFile = ".kafka_cleanshutdown"
+ val LockFile = ".lock"
+ val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
private val logFileSizeMap = config.logFileSizeMap
- private val flushInterval = config.flushInterval
- private val logCreationLock = new Object
+ private val logFlushInterval = config.flushInterval
private val logFlushIntervals = config.flushIntervalMap
+ private val logCreationLock = new Object
private val logRetentionSizeMap = config.logRetentionSizeMap
private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
- this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
+ private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours
+ private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMinutes
+ private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours
- /* Initialize a log for each subdirectory of the main log directory */
- private val logs = new Pool[String, Pool[Int, Log]]()
- if(!logDir.exists()) {
- info("No log directory found, creating '" + logDir.getAbsolutePath() + "'")
- logDir.mkdirs()
- }
- if(!logDir.isDirectory() || !logDir.canRead())
- throw new KafkaException(logDir.getAbsolutePath() + " is not a readable log directory.")
- val subDirs = logDir.listFiles()
- if(subDirs != null) {
- for(dir <- subDirs) {
- if(dir.getName.equals(HighwaterMarkCheckpoint.highWatermarkFileName)){
- // skip valid metadata file
+ this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
+ private val logs = new Pool[TopicAndPartition, Log]()
+
+ createAndValidateLogDirs(logDirs)
+ private var dirLocks = lockLogDirs(logDirs)
+ loadLogs(logDirs)
+
+ /**
+ * 1. Ensure that there are no duplicates in the directory list
+ * 2. Create each directory if it doesn't exist
+ * 3. Check that each path is a readable directory
+ */
+ private def createAndValidateLogDirs(dirs: Seq[File]) {
+ if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
+ throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", "))
+ for(dir <- dirs) {
+ if(!dir.exists) {
+ info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
+ val created = dir.mkdirs()
+ if(!created)
+ throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath)
}
- else if(!dir.isDirectory()) {
- warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
- } else {
- info("Loading log '" + dir.getName() + "'")
- val topicPartition = parseTopicPartitionName(dir.getName)
- val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
- val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
- val log = new Log(dir,
- maxLogFileSize,
- config.maxMessageSize,
- flushInterval,
- rollIntervalMs,
- needRecovery,
- config.logIndexMaxSizeBytes,
- config.logIndexIntervalBytes,
- time,
- config.brokerId)
- logs.putIfNotExists(topicPartition.topic, new Pool[Int, Log]())
- val parts = logs.get(topicPartition.topic)
- parts.put(topicPartition.partition, log)
+ if(!dir.isDirectory || !dir.canRead)
+ throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.")
+ }
+ }
+
+ /**
+ * Lock all the given directories
+ */
+ private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
+ dirs.map { dir =>
+ val lock = new FileLock(new File(dir, LockFile))
+ if(!lock.tryLock())
+ throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath +
+ ". A Kafka instance in another process or thread is using this directory.")
+ lock
+ }
+ }
+
+ /**
+ * Recovery and load all logs in the given data directories
+ */
+ private def loadLogs(dirs: Seq[File]) {
+ for(dir <- dirs) {
+ /* check if this set of logs was shut down cleanly */
+ val cleanShutDownFile = new File(dir, CleanShutdownFile)
+ val needsRecovery = cleanShutDownFile.exists
+ cleanShutDownFile.delete
+ /* load the logs */
+ val subDirs = dir.listFiles()
+ if(subDirs != null) {
+ for(dir <- subDirs) {
+ if(dir.isDirectory){
+ info("Loading log '" + dir.getName + "'")
+ val topicPartition = parseTopicPartitionName(dir.getName)
+ val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
+ val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
+ val log = new Log(dir,
+ maxLogFileSize,
+ config.maxMessageSize,
+ logFlushInterval,
+ rollIntervalMs,
+ needsRecovery,
+ config.logIndexMaxSizeBytes,
+ config.logIndexIntervalBytes,
+ time,
+ config.brokerId)
+ val previous = this.logs.put(topicPartition, log)
+ if(previous != null)
+ throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+ }
+ }
}
}
}
@@ -99,18 +145,81 @@ private[kafka] class LogManager(val conf
config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
}
}
+
+ /**
+ * Get the log if it exists
+ */
+ def getLog(topic: String, partition: Int): Option[Log] = {
+ val topicAndPartiton = TopicAndPartition(topic, partition)
+ val log = logs.get(topicAndPartiton)
+ if (log == null)
+ None
+ else
+ Some(log)
+ }
+ /**
+ * Create the log if it does not exist, if it exists just return it
+ */
+ def getOrCreateLog(topic: String, partition: Int): Log = {
+ val topicAndPartition = TopicAndPartition(topic, partition)
+ logs.get(topicAndPartition) match {
+ case null => createLogIfNotExists(topicAndPartition)
+ case log: Log => log
+ }
+ }
/**
* Create a log for the given topic and the given partition
+ * If the log already exists, just return a copy of the existing log
*/
- private def createLog(topic: String, partition: Int): Log = {
+ private def createLogIfNotExists(topicAndPartition: TopicAndPartition): Log = {
logCreationLock synchronized {
- val d = new File(logDir, topic + "-" + partition)
- d.mkdirs()
- val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
- val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
- new Log(d, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needsRecovery = false, config.logIndexMaxSizeBytes, config.logIndexIntervalBytes, time, config.brokerId)
+ var log = logs.get(topicAndPartition)
+
+ // check if the log has already been created in another thread
+ if(log != null)
+ return log
+
+ // if not, create it
+ val dataDir = nextLogDir()
+ val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
+ dir.mkdirs()
+ val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
+ val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize)
+ log = new Log(dir,
+ maxLogFileSize,
+ config.maxMessageSize,
+ logFlushInterval,
+ rollIntervalMs,
+ needsRecovery = false,
+ config.logIndexMaxSizeBytes,
+ config.logIndexIntervalBytes,
+ time,
+ config.brokerId)
+ info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
+ logs.put(topicAndPartition, log)
+ log
+ }
+ }
+
+ /**
+ * Choose the next directory in which to create a log. Currently this is done
+ * by calculating the number of partitions in each directory and then choosing the
+ * data directory with the fewest partitions.
+ */
+ private def nextLogDir(): File = {
+ if(logDirs.size == 1) {
+ logDirs(0)
+ } else {
+ // count the number of logs in each parent directory (including 0 for empty directories
+ val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
+ val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
+ var dirCounts = (zeros ++ logCounts).toBuffer
+
+ // choose the directory with the least logs in it
+ val leastLoaded = dirCounts.sortBy(_._2).head
+ new File(leastLoaded._1)
}
}
@@ -123,48 +232,8 @@ private[kafka] class LogManager(val conf
}
/**
- * Get the log if it exists
+ * Runs through the log removing segments older than a certain age
*/
- def getLog(topic: String, partition: Int): Option[Log] = {
- val parts = logs.get(topic)
- if (parts == null) None
- else {
- val log = parts.get(partition)
- if(log == null) None
- else Some(log)
- }
- }
-
- /**
- * Create the log if it does not exist, if it exists just return it
- */
- def getOrCreateLog(topic: String, partition: Int): Log = {
- var hasNewTopic = false
- var parts = logs.get(topic)
- if (parts == null) {
- val found = logs.putIfNotExists(topic, new Pool[Int, Log])
- if (found == null)
- hasNewTopic = true
- parts = logs.get(topic)
- }
- var log = parts.get(partition)
- if(log == null) {
- // check if this broker hosts this partition
- log = createLog(topic, partition)
- val found = parts.putIfNotExists(partition, log)
- if(found != null) {
- // there was already somebody there
- log.close()
- log = found
- }
- else
- info("Created log for '" + topic + "'-" + partition)
- }
-
- log
- }
-
- /* Runs through the log removing segments older than a certain age */
private def cleanupExpiredSegments(log: Log): Int = {
val startMs = time.milliseconds
val topic = parseTopicPartitionName(log.name).topic
@@ -216,14 +285,22 @@ private[kafka] class LogManager(val conf
*/
def shutdown() {
debug("Shutting down.")
- allLogs.foreach(_.close())
+ try {
+ // close the logs
+ allLogs.foreach(_.close())
+ // mark that the shutdown was clean by creating the clean shutdown marker file
+ logDirs.foreach(dir => Utils.swallow(new File(dir, CleanShutdownFile).createNewFile()))
+ } finally {
+ // regardless of whether the close succeeded, we need to unlock the data directories
+ dirLocks.foreach(_.destroy())
+ }
debug("Shutdown complete.")
}
/**
* Get all the partition logs
*/
- def allLogs() = logs.values.flatMap(_.values)
+ def allLogs(): Iterable[Log] = logs.values
/**
* Flush any log which has exceeded its flush interval and has unwritten messages.
@@ -253,12 +330,11 @@ private[kafka] class LogManager(val conf
}
}
-
- def topics(): Iterable[String] = logs.keys
-
private def parseTopicPartitionName(name: String): TopicAndPartition = {
val index = name.lastIndexOf('-')
TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
}
+ def topics(): Iterable[String] = logs.keys.map(_.topic)
+
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala Fri Nov 2 19:01:38 2012
@@ -17,6 +17,7 @@
package kafka.server
import kafka.utils.Logging
+import kafka.common._
import java.util.concurrent.locks.ReentrantLock
import java.io._
@@ -40,7 +41,7 @@ class HighwaterMarkCheckpoint(val path:
private val hwFileLock = new ReentrantLock()
// recover from previous tmp file, if required
- def write(highwaterMarksPerPartition: Map[(String, Int), Long]) {
+ def write(highwaterMarksPerPartition: Map[TopicAndPartition, Long]) {
hwFileLock.lock()
try {
// write to temp file and then swap with the highwatermark file
@@ -56,9 +57,7 @@ class HighwaterMarkCheckpoint(val path:
hwFileWriter.newLine()
highwaterMarksPerPartition.foreach { partitionAndHw =>
- val topic = partitionAndHw._1._1
- val partitionId = partitionAndHw._1._2
- hwFileWriter.write("%s %s %s".format(topic, partitionId, partitionAndHw._2))
+ hwFileWriter.write("%s %s %s".format(partitionAndHw._1.topic, partitionAndHw._1.partition, partitionAndHw._2))
hwFileWriter.newLine()
}
hwFileWriter.flush()
@@ -77,9 +76,10 @@ class HighwaterMarkCheckpoint(val path:
hwFileLock.lock()
try {
hwFile.length() match {
- case 0 => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
- "partition %d. Returning 0 as the highwatermark".format(partition))
- 0L
+ case 0 =>
+ warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
+ "partition %d. Returning 0 as the highwatermark".format(partition))
+ 0L
case _ =>
val hwFileReader = new BufferedReader(new FileReader(hwFile))
val version = hwFileReader.readLine().toShort
@@ -95,17 +95,18 @@ class HighwaterMarkCheckpoint(val path:
// find the index of partition
val partitionIndex = nextHwEntry.indexOf(partitionId)
val topic = nextHwEntry.substring(0, partitionIndex-1)
- ((topic, partitionId.toInt) -> highwaterMark)
+ (TopicAndPartition(topic, partitionId.toInt) -> highwaterMark)
}
hwFileReader.close()
- val hwOpt = partitionHighWatermarks.toMap.get((topic, partition))
+ val hwOpt = partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition))
hwOpt match {
- case Some(hw) => debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file"
- .format(hw, topic, partition))
+ case Some(hw) =>
+ debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file".format(hw, topic, partition))
hw
- case None => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
- "partition %d. Returning 0 as the highwatermark".format(partition))
- 0L
+ case None =>
+ warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
+ "partition %d. Returning 0 as the highwatermark".format(partition))
+ 0L
}
case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version)
System.exit(1)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Fri Nov 2 19:01:38 2012
@@ -21,7 +21,7 @@ import java.util.Properties
import kafka.message.Message
import kafka.consumer.ConsumerConfig
import java.net.InetAddress
-import kafka.utils.{VerifiableProperties, ZKConfig}
+import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
/**
* Configuration settings for the kafka server
@@ -74,7 +74,8 @@ class KafkaConfig private (val props: Ve
val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
/* the directories in which the log data is kept */
- val logDir = props.getString("log.dir")
+ val logDirs = Utils.parseCsvList(props.getString("log.directories", props.getString("log.dir", "")))
+ require(logDirs.size > 0)
/* the maximum size of a single log file */
val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Fri Nov 2 19:01:38 2012
@@ -32,7 +32,6 @@ import kafka.controller.{ControllerStat,
*/
class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
this.logIdent = "[Kafka Server " + config.brokerId + "], "
- val CleanShutdownFile = ".kafka_cleanshutdown"
private var isShuttingDown = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1)
var socketServer: SocketServer = null
@@ -53,12 +52,6 @@ class KafkaServer(val config: KafkaConfi
info("starting")
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
- var needRecovery = true
- val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
- if (cleanShutDownFile.exists) {
- needRecovery = false
- cleanShutDownFile.delete
- }
/* start scheduler */
kafkaScheduler.startup
@@ -66,11 +59,7 @@ class KafkaServer(val config: KafkaConfi
/* start log manager */
logManager = new LogManager(config,
kafkaScheduler,
- time,
- 1000L * 60 * 60 * config.logRollHours,
- 1000L * 60 * config.logCleanupIntervalMinutes,
- 1000L * 60 * 60 * config.logRetentionHours,
- needRecovery)
+ time)
logManager.startup()
socketServer = new SocketServer(config.brokerId,
@@ -122,25 +111,22 @@ class KafkaServer(val config: KafkaConfi
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
if(requestHandlerPool != null)
- requestHandlerPool.shutdown()
- kafkaScheduler.shutdown()
+ Utils.swallow(requestHandlerPool.shutdown())
+ Utils.swallow(kafkaScheduler.shutdown())
if(apis != null)
- apis.close()
+ Utils.swallow(apis.close())
if(kafkaZookeeper != null)
- kafkaZookeeper.shutdown()
+ Utils.swallow(kafkaZookeeper.shutdown())
if(replicaManager != null)
- replicaManager.shutdown()
+ Utils.swallow(replicaManager.shutdown())
if(socketServer != null)
- socketServer.shutdown()
+ Utils.swallow(socketServer.shutdown())
if(logManager != null)
- logManager.shutdown()
+ Utils.swallow(logManager.shutdown())
if(kafkaController != null)
- kafkaController.shutdown()
+ Utils.swallow(kafkaController.shutdown())
- val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
- debug("creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
- cleanShutDownFile.createNewFile
shutdownLatch.countDown()
info("shut down completed")
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Fri Nov 2 19:01:38 2012
@@ -25,7 +25,7 @@ import kafka.log.LogManager
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.util.concurrent.TimeUnit
-import kafka.common.{ReplicaNotAvailableException, UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
+import kafka.common._
import kafka.api.{PartitionStateInfo, LeaderAndIsrRequest}
@@ -44,8 +44,7 @@ class ReplicaManager(val config: KafkaCo
val replicaFetcherManager = new ReplicaFetcherManager(config, this)
this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
- val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
- info("Created highwatermark file %s".format(highWatermarkCheckpoint.name))
+ val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
newGauge(
"LeaderCount",
@@ -245,22 +244,12 @@ class ReplicaManager(val config: KafkaCo
* Flushes the highwatermark value for all partitions to the highwatermark file
*/
def checkpointHighWatermarks() {
- val highWaterarksForAllPartitions = allPartitions.map {
- partition =>
- val topic = partition._1._1
- val partitionId = partition._1._2
- val localReplicaOpt = partition._2.getReplica(config.brokerId)
- val hw = localReplicaOpt match {
- case Some(localReplica) => localReplica.highWatermark
- case None =>
- error("Highwatermark for topic %s partition %d doesn't exist during checkpointing"
- .format(topic, partitionId))
- 0L
- }
- (topic, partitionId) -> hw
- }.toMap
- highWatermarkCheckpoint.write(highWaterarksForAllPartitions)
- trace("Checkpointed high watermark data: %s".format(highWaterarksForAllPartitions))
+ val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}
+ val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
+ for((dir, reps) <- replicasByDir) {
+ val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap
+ highWatermarkCheckpoints(dir).write(hwms)
+ }
}
def shutdown() {
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/FileLock.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/FileLock.scala?rev=1405102&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/FileLock.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/FileLock.scala Fri Nov 2 19:01:38 2012
@@ -0,0 +1,64 @@
+package kafka.utils
+
+import java.io._
+import java.nio.channels._
+
+/**
+ * A file lock a la flock/funlock
+ *
+ * The given path will be created and opened if it doesn't exist.
+ */
+class FileLock(val file: File) extends Logging {
+ file.createNewFile()
+ private val channel = new RandomAccessFile(file, "rw").getChannel()
+ private var flock: java.nio.channels.FileLock = null
+
+ /**
+ * Lock the file or throw an exception if the lock is already held
+ */
+ def lock() {
+ this synchronized {
+ trace("Acquiring lock on " + file.getAbsolutePath)
+ flock = channel.lock()
+ }
+ }
+
+ /**
+ * Try to lock the file and return true if the locking succeeds
+ */
+ def tryLock(): Boolean = {
+ this synchronized {
+ trace("Acquiring lock on " + file.getAbsolutePath)
+ try {
+ // weirdly this method will return null if the lock is held by another
+ // process, but will throw an exception if the lock is held by this process
+ // so we have to handle both cases
+ flock = channel.tryLock()
+ flock != null
+ } catch {
+ case e: OverlappingFileLockException => false
+ }
+ }
+ }
+
+ /**
+ * Unlock the lock if it is held
+ */
+ def unlock() {
+ this synchronized {
+ trace("Releasing lock on " + file.getAbsolutePath)
+ if(flock != null)
+ flock.release()
+ }
+ }
+
+ /**
+ * Destroy this lock, closing the associated FileChannel
+ */
+ def destroy() = {
+ this synchronized {
+ unlock()
+ channel.close()
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Fri Nov 2 19:01:38 2012
@@ -226,22 +226,28 @@ object Utils extends Logging {
def rm(file: String): Unit = rm(new File(file))
/**
+ * Recursively delete the list of files/directories and any subfiles (if any exist)
+ * @param a sequence of files to be deleted
+ */
+ def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f)))
+
+ /**
* Recursively delete the given file/directory and any subfiles (if any exist)
* @param file The root file at which to begin deleting
*/
- def rm(file: File): Unit = {
- if(file == null) {
- return
- } else if(file.isDirectory) {
- val files = file.listFiles()
- if(files != null) {
- for(f <- files)
- rm(f)
- }
- file.delete()
- } else {
- file.delete()
- }
+ def rm(file: File) {
+ if(file == null) {
+ return
+ } else if(file.isDirectory) {
+ val files = file.listFiles()
+ if(files != null) {
+ for(f <- files)
+ rm(f)
+ }
+ file.delete()
+ } else {
+ file.delete()
+ }
}
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala Fri Nov 2 19:01:38 2012
@@ -22,6 +22,8 @@ import scala.collection._
class VerifiableProperties(val props: Properties) extends Logging {
private val referenceSet = mutable.HashSet[String]()
+
+ def this() = this(new Properties)
def containsKey(name: String): Boolean = {
props.containsKey(name)
@@ -185,4 +187,6 @@ class VerifiableProperties(val props: Pr
info("Property %s is overridden to %s".format(key, props.getProperty(key)))
}
}
+
+ override def toString(): String = props.toString
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala Fri Nov 2 19:01:38 2012
@@ -40,7 +40,7 @@ trait KafkaServerTestHarness extends JUn
override def tearDown() {
servers.map(server => server.shutdown())
- servers.map(server => Utils.rm(server.config.logDir))
+ servers.map(server => server.config.logDirs.map(Utils.rm(_)))
super.tearDown
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Fri Nov 2 19:01:38 2012
@@ -18,21 +18,23 @@
package kafka.log
import java.io._
+import java.nio.channels.OverlappingFileLockException
import junit.framework.Assert._
import org.junit.Test
import kafka.common.OffsetOutOfRangeException
import org.scalatest.junit.JUnit3Suite
import kafka.server.KafkaConfig
+import kafka.common._
import kafka.utils._
class LogManagerTest extends JUnit3Suite {
val time: MockTime = new MockTime()
val maxRollInterval = 100
- val maxLogAge = 1000
+ val maxLogAgeHours = 10
var logDir: File = null
var logManager: LogManager = null
- var config:KafkaConfig = null
+ var config: KafkaConfig = null
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
val scheduler = new KafkaScheduler(2)
@@ -41,12 +43,13 @@ class LogManagerTest extends JUnit3Suite
super.setUp()
config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
override val logFileSize = 1024
- override val flushInterval = 100
+ override val flushInterval = 10000
+ override val logRetentionHours = maxLogAgeHours
}
scheduler.startup
- logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false)
+ logManager = new LogManager(config, scheduler, time)
logManager.startup
- logDir = logManager.logDir
+ logDir = logManager.logDirs(0)
}
override def tearDown() {
@@ -54,13 +57,14 @@ class LogManagerTest extends JUnit3Suite
if(logManager != null)
logManager.shutdown()
Utils.rm(logDir)
+ logManager.logDirs.map(Utils.rm(_))
super.tearDown()
}
@Test
def testCreateLog() {
val log = logManager.getOrCreateLog(name, 0)
- val logFile = new File(config.logDir, name + "-0")
+ val logFile = new File(config.logDirs(0), name + "-0")
assertTrue(logFile.exists)
log.append(TestUtils.singleMessageSet("test".getBytes()))
}
@@ -68,7 +72,7 @@ class LogManagerTest extends JUnit3Suite
@Test
def testGetLog() {
val log = logManager.getLog(name, 0)
- val logFile = new File(config.logDir, name + "-0")
+ val logFile = new File(config.logDirs(0), name + "-0")
assertTrue(!logFile.exists)
}
@@ -87,12 +91,13 @@ class LogManagerTest extends JUnit3Suite
// update the last modified time of all log segments
val logSegments = log.segments.view
- logSegments.foreach(s => s.messageSet.file.setLastModified(time.currentMs))
+ logSegments.foreach(_.messageSet.file.setLastModified(time.currentMs))
- time.currentMs += maxLogAge + 3000
+ time.currentMs += maxLogAgeHours*60*60*1000 + 1
logManager.cleanupLogs()
assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes)
+
try {
log.read(0, 1024)
fail("Should get exception from fetching earlier.")
@@ -115,8 +120,9 @@ class LogManagerTest extends JUnit3Suite
override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
override val logRetentionHours = retentionHours
override val flushInterval = 100
+ override val logRollHours = maxRollInterval
}
- logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, retentionMs, false)
+ logManager = new LogManager(config, scheduler, time)
logManager.startup
// create a log
@@ -157,17 +163,47 @@ class LogManagerTest extends JUnit3Suite
override val logFileSize = 1024 *1024 *1024
override val flushSchedulerThreadRate = 50
override val flushInterval = Int.MaxValue
+ override val logRollHours = maxRollInterval
override val flushIntervalMap = Map("timebasedflush" -> 100)
}
- logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false)
+ logManager = new LogManager(config, scheduler, time)
logManager.startup
val log = logManager.getOrCreateLog(name, 0)
for(i <- 0 until 200) {
var set = TestUtils.singleMessageSet("test".getBytes())
log.append(set)
}
- println("now = " + System.currentTimeMillis + " last flush = " + log.getLastFlushedTime)
- assertTrue("The last flush time has to be within defaultflushInterval of current time ",
- (System.currentTimeMillis - log.getLastFlushedTime) < 150)
+ val ellapsed = System.currentTimeMillis - log.getLastFlushedTime
+ assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed),
+ ellapsed < 2*config.flushSchedulerThreadRate)
+ }
+
+ @Test
+ def testLeastLoadedAssignment() {
+ // create a log manager with multiple data directories
+ val props = TestUtils.createBrokerConfig(0, -1)
+ val dirs = Seq(TestUtils.tempDir().getAbsolutePath,
+ TestUtils.tempDir().getAbsolutePath,
+ TestUtils.tempDir().getAbsolutePath)
+ props.put("log.directories", dirs.mkString(","))
+ logManager.shutdown()
+ logManager = new LogManager(new KafkaConfig(props), scheduler, time)
+
+ // verify that logs are always assigned to the least loaded partition
+ for(partition <- 0 until 20) {
+ logManager.getOrCreateLog("test", partition)
+ assertEquals("We should have created the right number of logs", partition + 1, logManager.allLogs.size)
+ val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size)
+ assertTrue("Load should balance evenly", counts.max <= counts.min + 1)
+ }
+ }
+
+ def testTwoLogManagersUsingSameDirFails() {
+ try {
+ new LogManager(logManager.config, scheduler, time)
+ fail("Should not be able to create a second log manager instance with the same data directory")
+ } catch {
+ case e: KafkaException => // this is good
+ }
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Fri Nov 2 19:01:38 2012
@@ -79,8 +79,8 @@ class ProducerTest extends JUnit3Suite w
server1.awaitShutdown()
server2.shutdown
server2.awaitShutdown()
- Utils.rm(server1.config.logDir)
- Utils.rm(server2.config.logDir)
+ Utils.rm(server1.config.logDirs)
+ Utils.rm(server2.config.logDirs)
super.tearDown()
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Fri Nov 2 19:01:38 2012
@@ -16,14 +16,15 @@
*/
package kafka.server
-import kafka.log.Log
+import kafka.log.{Log, LogManager}
import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
import org.easymock.EasyMock
+import org.junit._
import org.junit.Assert._
import kafka.common.KafkaException
import kafka.cluster.Replica
-import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime}
+import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils}
class HighwatermarkPersistenceTest extends JUnit3Suite {
@@ -31,30 +32,37 @@ class HighwatermarkPersistenceTest exten
override val defaultFlushIntervalMs = 100
})
val topic = "foo"
+ val logManagers = configs.map(config => new LogManager(config, new KafkaScheduler(1), new MockTime))
+
+ @After
+ def teardown() {
+ for(manager <- logManagers; dir <- manager.logDirs)
+ Utils.rm(dir)
+ }
def testHighWatermarkPersistenceSinglePartition() {
// mock zkclient
val zkClient = EasyMock.createMock(classOf[ZkClient])
EasyMock.replay(zkClient)
+
// create kafka scheduler
val scheduler = new KafkaScheduler(2)
scheduler.startup
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
+ val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0))
replicaManager.startup()
replicaManager.checkpointHighWatermarks()
- var fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
+ var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
assertEquals(0L, fooPartition0Hw)
val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1)
- // create leader log
- val log0 = getMockLog
// create leader and follower replicas
+ val log0 = logManagers(0).getOrCreateLog(topic, 0)
val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
partition0.addReplicaIfNotExists(leaderReplicaPartition0)
val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime)
partition0.addReplicaIfNotExists(followerReplicaPartition0)
replicaManager.checkpointHighWatermarks()
- fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
+ fooPartition0Hw = hwmFor(replicaManager, topic, 0)
assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
try {
followerReplicaPartition0.highWatermark
@@ -65,10 +73,9 @@ class HighwatermarkPersistenceTest exten
// set the highwatermark for local replica
partition0.getReplica().get.highWatermark = 5L
replicaManager.checkpointHighWatermarks()
- fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
+ fooPartition0Hw = hwmFor(replicaManager, topic, 0)
assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
EasyMock.verify(zkClient)
- EasyMock.verify(log0)
}
def testHighWatermarkPersistenceMultiplePartitions() {
@@ -81,35 +88,35 @@ class HighwatermarkPersistenceTest exten
val scheduler = new KafkaScheduler(2)
scheduler.startup
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
+ val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0))
replicaManager.startup()
replicaManager.checkpointHighWatermarks()
- var topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
+ var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
assertEquals(0L, topic1Partition0Hw)
val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1)
// create leader log
- val topic1Log0 = getMockLog
+ val topic1Log0 = logManagers(0).getOrCreateLog(topic1, 0)
// create a local replica for topic1
val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0))
topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
replicaManager.checkpointHighWatermarks()
- topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
+ topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw)
// set the highwatermark for local replica
topic1Partition0.getReplica().get.highWatermark = 5L
replicaManager.checkpointHighWatermarks()
- topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
+ topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark)
assertEquals(5L, topic1Partition0Hw)
// add another partition and set highwatermark
val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1)
// create leader log
- val topic2Log0 = getMockLog
+ val topic2Log0 = logManagers(0).getOrCreateLog(topic2, 0)
// create a local replica for topic2
val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0))
topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
replicaManager.checkpointHighWatermarks()
- var topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0)
+ var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw)
// set the highwatermark for local replica
topic2Partition0.getReplica().get.highWatermark = 15L
@@ -119,19 +126,16 @@ class HighwatermarkPersistenceTest exten
assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark)
replicaManager.checkpointHighWatermarks()
// verify checkpointed hw for topic 2
- topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0)
+ topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
assertEquals(15L, topic2Partition0Hw)
// verify checkpointed hw for topic 1
- topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
+ topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
assertEquals(10L, topic1Partition0Hw)
EasyMock.verify(zkClient)
- EasyMock.verify(topic1Log0)
- EasyMock.verify(topic2Log0)
}
- private def getMockLog: Log = {
- val log = EasyMock.createMock(classOf[kafka.log.Log])
- EasyMock.replay(log)
- log
+ def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {
+ replicaManager.highWatermarkCheckpoints(replicaManager.config.logDirs(0)).read(topic, partition)
}
+
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Fri Nov 2 19:01:38 2012
@@ -45,7 +45,7 @@ class LeaderElectionTest extends JUnit3S
override def tearDown() {
servers.map(server => server.shutdown())
- servers.map(server => Utils.rm(server.config.logDir))
+ servers.map(server => Utils.rm(server.config.logDirs))
super.tearDown()
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Fri Nov 2 19:01:38 2012
@@ -45,8 +45,8 @@ class LogRecoveryTest extends JUnit3Suit
val message = new Message("hello".getBytes())
var producer: Producer[Int, Message] = null
- var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDir)
- var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDir)
+ var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0))
+ var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0))
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
def testHWCheckpointNoFailuresSingleLogSegment {
@@ -83,7 +83,7 @@ class LogRecoveryTest extends JUnit3Suit
assertEquals(numMessages, leaderHW)
val followerHW = hwFile2.read(topic, 0)
assertEquals(numMessages, followerHW)
- servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDir)})
+ servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDirs(0))})
}
def testHWCheckpointWithFailuresSingleLogSegment {
@@ -148,7 +148,7 @@ class LogRecoveryTest extends JUnit3Suit
producer.close()
assertEquals(hw, hwFile1.read(topic, 0))
assertEquals(hw, hwFile2.read(topic, 0))
- servers.foreach(server => Utils.rm(server.config.logDir))
+ servers.foreach(server => Utils.rm(server.config.logDirs))
}
def testHWCheckpointNoFailuresMultipleLogSegments {
@@ -165,8 +165,8 @@ class LogRecoveryTest extends JUnit3Suit
server2 = TestUtils.createServer(configs.last)
servers ++= List(server1, server2)
- hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
- hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
+ hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0))
+ hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0))
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
@@ -193,7 +193,7 @@ class LogRecoveryTest extends JUnit3Suit
assertEquals(hw, leaderHW)
val followerHW = hwFile2.read(topic, 0)
assertEquals(hw, followerHW)
- servers.foreach(server => Utils.rm(server.config.logDir))
+ servers.foreach(server => Utils.rm(server.config.logDirs))
}
def testHWCheckpointWithFailuresMultipleLogSegments {
@@ -210,8 +210,8 @@ class LogRecoveryTest extends JUnit3Suit
server2 = TestUtils.createServer(configs.last)
servers ++= List(server1, server2)
- hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
- hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
+ hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0))
+ hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0))
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
@@ -263,7 +263,7 @@ class LogRecoveryTest extends JUnit3Suit
producer.close()
assertEquals(hw, hwFile1.read(topic, 0))
assertEquals(hw, hwFile2.read(topic, 0))
- servers.foreach(server => Utils.rm(server.config.logDir))
+ servers.foreach(server => Utils.rm(server.config.logDirs))
}
private def sendMessages(n: Int = 1) {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Fri Nov 2 19:01:38 2012
@@ -31,71 +31,66 @@ import kafka.utils.{TestUtils, Utils}
class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val port = TestUtils.choosePort
+ val props = TestUtils.createBrokerConfig(0, port)
+ val config = new KafkaConfig(props)
+
+ val host = "localhost"
+ val topic = "test"
+ val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
+ val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
@Test
def testCleanShutdown() {
- val props = TestUtils.createBrokerConfig(0, port)
- val config = new KafkaConfig(props)
-
- val host = "localhost"
- val topic = "test"
- val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
- val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
-
- {
- val server = new KafkaServer(config)
- server.startup()
-
- // create topic
- CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
-
- val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
-
- // send some messages
- producer.send(new ProducerData[Int, Message](topic, 0, sent1))
-
- // do a clean shutdown
- server.shutdown()
- val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile)
+ var server = new KafkaServer(config)
+ server.startup()
+ var producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
+
+ // create topic
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+ // send some messages
+ producer.send(new ProducerData[Int, Message](topic, 0, sent1))
+
+ // do a clean shutdown and check that the clean shudown file is written out
+ server.shutdown()
+ for(logDir <- config.logDirs) {
+ val cleanShutDownFile = new File(logDir, server.logManager.CleanShutdownFile)
assertTrue(cleanShutDownFile.exists)
- producer.close()
}
+ producer.close()
+
+ /* now restart the server and check that the written data is still readable and everything still works */
+ server = new KafkaServer(config)
+ server.startup()
+
+ producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
+ val consumer = new SimpleConsumer(host,
+ port,
+ 1000000,
+ 64*1024)
+
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+
+ var fetchedMessage: ByteBufferMessageSet = null
+ while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+ val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build())
+ fetchedMessage = fetched.messageSet(topic, 0)
+ }
+ TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator)
+ val newOffset = fetchedMessage.last.nextOffset
+ // send some more messages
+ producer.send(new ProducerData[Int, Message](topic, 0, sent2))
- {
- val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
- val consumer = new SimpleConsumer(host,
- port,
- 1000000,
- 64*1024)
-
- val server = new KafkaServer(config)
- server.startup()
-
- waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
-
- var fetchedMessage: ByteBufferMessageSet = null
- while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
- val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
- fetchedMessage = fetched.messageSet(topic, 0)
- }
- TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator)
- val newOffset = fetchedMessage.last.nextOffset
-
- // send some more messages
- producer.send(new ProducerData[Int, Message](topic, 0, sent2))
-
- fetchedMessage = null
- while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
- val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
- fetchedMessage = fetched.messageSet(topic, 0)
- }
- TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator)
-
- server.shutdown()
- Utils.rm(server.config.logDir)
- producer.close()
+ fetchedMessage = null
+ while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+ val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
+ fetchedMessage = fetched.messageSet(topic, 0)
}
+ TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator)
+ consumer.close()
+ producer.close()
+ server.shutdown()
+ Utils.rm(server.config.logDirs)
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala Fri Nov 2 19:01:38 2012
@@ -74,7 +74,6 @@ class SimpleFetchTest extends JUnit3Suit
EasyMock.expect(replicaManager.config).andReturn(configs.head)
EasyMock.expect(replicaManager.logManager).andReturn(logManager)
EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
- EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint]))
EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
EasyMock.replay(replicaManager)
@@ -169,7 +168,6 @@ class SimpleFetchTest extends JUnit3Suit
EasyMock.expect(replicaManager.config).andReturn(configs.head)
EasyMock.expect(replicaManager.logManager).andReturn(logManager)
EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
- EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint]))
EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
EasyMock.replay(replicaManager)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Fri Nov 2 19:01:38 2012
@@ -483,6 +483,7 @@ object TestUtils extends Logging {
byteBuffer.rewind()
byteBuffer
}
+
}
object TestZKUtils {