You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/02 01:42:17 UTC
svn commit: r1152970 [17/26] - in /incubator/kafka: branches/ site/ trunk/
trunk/bin/ trunk/clients/ trunk/clients/clojure/
trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/
trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Added: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,361 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.concurrent.atomic._
+import java.text.NumberFormat
+import java.io._
+import java.nio.channels.FileChannel
+import org.apache.log4j._
+import kafka.message._
+import kafka.utils._
+import kafka.common._
+import kafka.api.OffsetRequest
+import java.util._
+
+private[log] object Log {
+ val FileSuffix = ".kafka"
+
+ /**
+ * Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges
+ * but instead of checking for equality looks within the range. Takes the array size as an option in case
+ * the array grows while searching happens
+ *
+ * TODO: This should move into SegmentList.scala
+ */
+ def findRange[T <: Range](ranges: Array[T], value: Long, arraySize: Int): Option[T] = {
+ if(ranges.size < 1)
+ return None
+
+ // check out of bounds
+ if(value < ranges(0).start || value > ranges(arraySize - 1).start + ranges(arraySize - 1).size)
+ throw new OffsetOutOfRangeException("offset " + value + " is out of range")
+
+ // check at the end
+ if (value == ranges(arraySize - 1).start + ranges(arraySize - 1).size)
+ return None
+
+ var low = 0
+ var high = arraySize - 1
+ while(low <= high) {
+ val mid = (high + low) / 2
+ val found = ranges(mid)
+ if(found.contains(value))
+ return Some(found)
+ else if (value < found.start)
+ high = mid - 1
+ else
+ low = mid + 1
+ }
+ None
+ }
+
+ def findRange[T <: Range](ranges: Array[T], value: Long): Option[T] =
+ findRange(ranges, value, ranges.length)
+
+ /**
+ * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
+ * so that ls sorts the files numerically
+ */
+ def nameFromOffset(offset: Long): String = {
+ val nf = NumberFormat.getInstance()
+ nf.setMinimumIntegerDigits(20)
+ nf.setMaximumFractionDigits(0)
+ nf.setGroupingUsed(false)
+ nf.format(offset) + Log.FileSuffix
+ }
+}
+
+/**
+ * A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size
+ */
+private[log] class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
+ @volatile var deleted = false
+ def size: Long = messageSet.highWaterMark
+ override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
+}
+
+
+/**
+ * An append-only log for storing messages.
+ */
+@threadsafe
+private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) {
+
+ private val logger = Logger.getLogger(classOf[Log])
+
+ /* A lock that guards all modifications to the log */
+ private val lock = new Object
+
+ /* The current number of unflushed messages appended to the write */
+ private val unflushed = new AtomicInteger(0)
+
+ /* last time it was flushed */
+ private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
+
+ /* The actual segments of the log */
+ private[log] val segments: SegmentList[LogSegment] = loadSegments()
+
+ /* The name of this log */
+ val name = dir.getName()
+
+ private val logStats = new LogStats(this)
+
+ Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
+
+ /* Load the log segments from the log files on disk */
+ private def loadSegments(): SegmentList[LogSegment] = {
+ // open all the segments read-only
+ val accum = new ArrayList[LogSegment]
+ val ls = dir.listFiles()
+ if(ls != null) {
+ for(file <- ls if file.isFile && file.toString.endsWith(Log.FileSuffix)) {
+ if(!file.canRead)
+ throw new IOException("Could not read file " + file)
+ val filename = file.getName()
+ val start = filename.substring(0, filename.length - Log.FileSuffix.length).toLong
+ val messageSet = new FileMessageSet(file, false)
+ accum.add(new LogSegment(file, messageSet, start))
+ }
+ }
+
+ if(accum.size == 0) {
+ // no existing segments, create a new mutable segment
+ val newFile = new File(dir, Log.nameFromOffset(0))
+ val set = new FileMessageSet(newFile, true)
+ accum.add(new LogSegment(newFile, set, 0))
+ } else {
+ // there is at least one existing segment, validate and recover them/it
+ // sort segments into ascending order for fast searching
+ Collections.sort(accum, new Comparator[LogSegment] {
+ def compare(s1: LogSegment, s2: LogSegment): Int = {
+ if(s1.start == s2.start) 0
+ else if(s1.start < s2.start) -1
+ else 1
+ }
+ })
+ validateSegments(accum)
+
+ //make the final section mutable and run recovery on it if necessary
+ val last = accum.remove(accum.size - 1)
+ last.messageSet.close()
+ logger.info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
+ val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
+ accum.add(mutable)
+ }
+ new SegmentList(accum.toArray(new Array[LogSegment](accum.size)))
+ }
+
+ /**
+ * Check that the ranges and sizes add up, otherwise we have lost some data somewhere
+ */
+ private def validateSegments(segments: ArrayList[LogSegment]) {
+ lock synchronized {
+ for(i <- 0 until segments.size - 1) {
+ val curr = segments.get(i)
+ val next = segments.get(i+1)
+ if(curr.start + curr.size != next.start)
+ throw new IllegalStateException("The following segments don't validate: " +
+ curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
+ }
+ }
+ }
+
+ /**
+ * The number of segments in the log
+ */
+ def numberOfSegments: Int = segments.view.length
+
+ /**
+ * Close this log
+ */
+ def close() {
+ lock synchronized {
+ for(seg <- segments.view)
+ seg.messageSet.close()
+ }
+ }
+
+ /**
+ * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
+ * Returns the offset at which the messages are written.
+ */
+ def append(messages: MessageSet): Unit = {
+ // validate the messages
+ var numberOfMessages = 0
+ for(messageAndOffset <- messages) {
+ if(!messageAndOffset.message.isValid)
+ throw new InvalidMessageException()
+ numberOfMessages += 1;
+ }
+
+ logStats.recordAppendedMessages(numberOfMessages)
+
+ // they are valid, insert them in the log
+ lock synchronized {
+ val segment = segments.view.last
+ segment.messageSet.append(messages)
+ maybeFlush(numberOfMessages)
+ maybeRoll(segment)
+ }
+ }
+
+ /**
+ * Read from the log file at the given offset
+ */
+ def read(offset: Long, length: Int): MessageSet = {
+ val view = segments.view
+ Log.findRange(view, offset, view.length) match {
+ case Some(segment) => segment.messageSet.read((offset - segment.start), length)
+ case _ => MessageSet.Empty
+ }
+ }
+
+ /**
+ * Delete any log segments matching the given predicate function
+ */
+ def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = {
+ lock synchronized {
+ val view = segments.view
+ val deletable = view.takeWhile(predicate)
+ for(seg <- deletable)
+ seg.deleted = true
+ val numToDelete = deletable.size
+ // if we are deleting everything, create a new empty segment
+ if(numToDelete == view.size)
+ roll()
+ segments.trunc(numToDelete)
+ }
+ }
+
+ /**
+ * Get the size of the log in bytes
+ */
+ def size: Long =
+ segments.view.foldLeft(0L)(_ + _.size)
+
+ /**
+ * The byte offset of the message that will be appended next.
+ */
+ def nextAppendOffset: Long = {
+ flush
+ val last = segments.view.last
+ last.start + last.size
+ }
+
+ /**
+ * get the current high watermark of the log
+ */
+ def getHighwaterMark: Long = segments.view.last.messageSet.highWaterMark
+
+ /**
+ * Roll the log over if necessary
+ */
+ private def maybeRoll(segment: LogSegment) {
+ if(segment.messageSet.sizeInBytes > maxSize)
+ roll()
+ }
+
+ /**
+ * Create a new segment and make it active
+ */
+ def roll() {
+ lock synchronized {
+ val last = segments.view.last
+ val newOffset = nextAppendOffset
+ val newFile = new File(dir, Log.nameFromOffset(newOffset))
+ if(logger.isDebugEnabled)
+ logger.debug("Rolling log '" + name + "' to " + newFile.getName())
+ segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
+ }
+ }
+
+ /**
+ * Flush the log if necessary
+ */
+ private def maybeFlush(numberOfMessages : Int) {
+ if(unflushed.addAndGet(numberOfMessages) >= flushInterval) {
+ flush()
+ }
+ }
+
+ /**
+ * Flush this log file to the physical disk
+ */
+ def flush() = {
+ lock synchronized {
+ if(logger.isDebugEnabled)
+ logger.debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
+ System.currentTimeMillis)
+ segments.view.last.messageSet.flush()
+ unflushed.set(0)
+ lastflushedTime.set(System.currentTimeMillis)
+ }
+ }
+
+ def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
+ val segsArray = segments.view
+ var offsetTimeArray: Array[Tuple2[Long, Long]] = null
+ if (segsArray.last.size > 0)
+ offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
+ else
+ offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
+
+ for (i <- 0 until segsArray.length)
+ offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified)
+ if (segsArray.last.size > 0)
+ offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
+
+ var startIndex = -1
+ request.time match {
+ case OffsetRequest.LatestTime =>
+ startIndex = offsetTimeArray.length - 1
+ case OffsetRequest.EarliestTime =>
+ startIndex = 0
+ case _ =>
+ var isFound = false
+ if(logger.isDebugEnabled) {
+ logger.debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
+ }
+ startIndex = offsetTimeArray.length - 1
+ while (startIndex >= 0 && !isFound) {
+ if (offsetTimeArray(startIndex)._2 <= request.time)
+ isFound = true
+ else
+ startIndex -=1
+ }
+ }
+
+ val retSize = request.maxNumOffsets.min(startIndex + 1)
+ val ret = new Array[Long](retSize)
+ for (j <- 0 until retSize) {
+ ret(j) = offsetTimeArray(startIndex)._1
+ startIndex -= 1
+ }
+ ret
+ }
+
+ def getTopicName():String = {
+ name.substring(0, name.lastIndexOf("-"))
+ }
+
+ def getLastFlushedTime():Long = {
+ return lastflushedTime.get
+ }
+}
+
Added: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io._
+import org.apache.log4j.Logger
+import kafka.utils._
+import scala.actors.Actor
+import scala.collection._
+import java.util.concurrent.CountDownLatch
+import kafka.server.{KafkaConfig, KafkaZooKeeper}
+import kafka.common.{InvalidTopicException, InvalidPartitionException}
+
+/**
+ * The guy who creates and hands out logs
+ */
+@threadsafe
+private[kafka] class LogManager(val config: KafkaConfig,
+ private val scheduler: KafkaScheduler,
+ private val time: Time,
+ val logCleanupIntervalMs: Long,
+ val logCleanupDefaultAgeMs: Long,
+ needRecovery: Boolean) {
+
+ val logDir: File = new File(config.logDir)
+ private val numPartitions = config.numPartitions
+ private val maxSize: Long = config.logFileSize
+ private val flushInterval = config.flushInterval
+ private val topicPartitionsMap = config.topicPartitionsMap
+ private val logger = Logger.getLogger(classOf[LogManager])
+ private val logCreationLock = new Object
+ private val random = new java.util.Random
+ private var kafkaZookeeper: KafkaZooKeeper = null
+ private var zkActor: Actor = null
+ private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
+ private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
+ private val logFlushIntervalMap = config.flushIntervalMap
+ private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
+
+ /* Initialize a log for each subdirectory of the main log directory */
+ private val logs = new Pool[String, Pool[Int, Log]]()
+ if(!logDir.exists()) {
+ logger.info("No log directory found, creating '" + logDir.getAbsolutePath() + "'")
+ logDir.mkdirs()
+ }
+ if(!logDir.isDirectory() || !logDir.canRead())
+ throw new IllegalArgumentException(logDir.getAbsolutePath() + " is not a readable log directory.")
+ val subDirs = logDir.listFiles()
+ if(subDirs != null) {
+ for(dir <- subDirs) {
+ if(!dir.isDirectory()) {
+ logger.warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
+ } else {
+ logger.info("Loading log '" + dir.getName() + "'")
+ val log = new Log(dir, maxSize, flushInterval, needRecovery)
+ val topicPartion = Utils.getTopicPartition(dir.getName)
+ logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
+ val parts = logs.get(topicPartion._1)
+ parts.put(topicPartion._2, log)
+ }
+ }
+ }
+
+ /* Schedule the cleanup task to delete old logs */
+ if(scheduler != null) {
+ logger.info("starting log cleaner every " + logCleanupIntervalMs + " ms")
+ scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
+ }
+
+ if(config.enableZookeeper) {
+ kafkaZookeeper = new KafkaZooKeeper(config, this)
+ kafkaZookeeper.startup
+ zkActor = new Actor {
+ def act() {
+ loop {
+ receive {
+ case topic: String =>
+ try {
+ kafkaZookeeper.registerTopicInZk(topic)
+ }
+ catch {
+ case e => logger.error(e) // log it and let it go
+ }
+ case StopActor =>
+ logger.info("zkActor stopped")
+ exit
+ }
+ }
+ }
+ }
+ zkActor.start
+ }
+
+ case object StopActor
+
+ private def getLogRetentionMSMap(logRetentionHourMap: Map[String, Int]) : Map[String, Long] = {
+ var ret = new mutable.HashMap[String, Long]
+ for ( (topic, hour) <- logRetentionHourMap )
+ ret.put(topic, hour * 60 * 60 * 1000L)
+ ret
+ }
+
+ /**
+ * Register this broker in ZK for the first time.
+ */
+ def startup() {
+ if(config.enableZookeeper) {
+ kafkaZookeeper.registerBrokerInZk()
+ for (topic <- getAllTopics)
+ kafkaZookeeper.registerTopicInZk(topic)
+ startupLatch.countDown
+ }
+ logger.info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap)
+ logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
+ }
+
+ private def awaitStartup() {
+ if (config.enableZookeeper)
+ startupLatch.await
+ }
+
+ def registerNewTopicInZK(topic: String) {
+ if (config.enableZookeeper)
+ zkActor ! topic
+ }
+
+ /**
+ * Create a log for the given topic and the given partition
+ */
+ private def createLog(topic: String, partition: Int): Log = {
+ logCreationLock synchronized {
+ val d = new File(logDir, topic + "-" + partition)
+ d.mkdirs()
+ new Log(d, maxSize, flushInterval, false)
+ }
+ }
+
+
+ def chooseRandomPartition(topic: String): Int = {
+ random.nextInt(topicPartitionsMap.getOrElse(topic, numPartitions))
+ }
+
+ /**
+ * Create the log if it does not exist, if it exists just return it
+ */
+ def getOrCreateLog(topic: String, partition: Int): Log = {
+ awaitStartup
+ if (topic.length <= 0)
+ throw new InvalidTopicException("topic name can't be empty")
+ if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) {
+ logger.warn("Wrong partition " + partition + " valid partitions (0," +
+ (topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")")
+ throw new InvalidPartitionException("wrong partition " + partition)
+ }
+ var hasNewTopic = false
+ var parts = logs.get(topic)
+ if (parts == null) {
+ val found = logs.putIfNotExists(topic, new Pool[Int, Log])
+ if (found == null)
+ hasNewTopic = true
+ parts = logs.get(topic)
+ }
+ var log = parts.get(partition)
+ if(log == null) {
+ log = createLog(topic, partition)
+ val found = parts.putIfNotExists(partition, log)
+ if(found != null) {
+ // there was already somebody there
+ log.close()
+ log = found
+ }
+ else
+ logger.info("Created log for '" + topic + "'-" + partition)
+ }
+
+ if (hasNewTopic)
+ registerNewTopicInZK(topic)
+ log
+ }
+
+ /**
+ * Delete any eligible logs. Return the number of segments deleted.
+ */
+ def cleanupLogs() {
+ logger.debug("Beginning log cleanup...")
+ val iter = getLogIterator
+ var total = 0
+ val startMs = time.milliseconds
+ while(iter.hasNext) {
+ val log = iter.next
+ logger.debug("Garbage collecting '" + log.name + "'")
+ var logCleanupThresholdMS = this.logCleanupDefaultAgeMs
+ val topic = Utils.getTopicPartition(log.dir.getName)._1
+ if (logRetentionMSMap.contains(topic))
+ logCleanupThresholdMS = logRetentionMSMap(topic)
+ val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
+ for(segment <- toBeDeleted) {
+ logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name)
+ Utils.swallow(logger.warn, segment.messageSet.close())
+ if(!segment.file.delete())
+ logger.warn("Delete failed.")
+ else
+ total += 1
+ }
+ }
+ logger.debug("Log cleanup completed. " + total + " files deleted in " +
+ (time.milliseconds - startMs) / 1000 + " seconds")
+ }
+
+ /**
+ * Close all the logs
+ */
+ def close() {
+ logFlusherScheduler.shutdown
+ val iter = getLogIterator
+ while(iter.hasNext)
+ iter.next.close()
+ if (config.enableZookeeper) {
+ zkActor ! StopActor
+ kafkaZookeeper.close
+ }
+ }
+
+ private def getLogIterator(): Iterator[Log] = {
+ new IteratorTemplate[Log] {
+ val partsIter = logs.values.iterator
+ var logIter: Iterator[Log] = null
+
+ override def makeNext(): Log = {
+ while (true) {
+ if (logIter != null && logIter.hasNext)
+ return logIter.next
+ if (!partsIter.hasNext)
+ return allDone
+ logIter = partsIter.next.values.iterator
+ }
+ // should never reach here
+ assert(false)
+ return allDone
+ }
+ }
+ }
+
+ private def flushAllLogs() = {
+ if (logger.isDebugEnabled)
+ logger.debug("flushing the high watermark of all logs")
+
+ for (log <- getLogIterator)
+ {
+ try{
+ val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
+ var logFlushInterval = config.defaultFlushIntervalMs
+ if(logFlushIntervalMap.contains(log.getTopicName))
+ logFlushInterval = logFlushIntervalMap(log.getTopicName)
+ if (logger.isDebugEnabled)
+ logger.debug(log.getTopicName + " flush interval " + logFlushInterval +
+ " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
+ if(timeSinceLastFlush >= logFlushInterval)
+ log.flush
+ }
+ catch {
+ case e =>
+ logger.error("error flushing " + log.getTopicName, e)
+ e match {
+ case _: IOException =>
+ logger.error("force shutdown due to error in flushAllLogs" + e)
+ Runtime.getRuntime.halt(1)
+ case _ =>
+ }
+ }
+ }
+ }
+
+
+ def getAllTopics(): Iterator[String] = logs.keys.iterator
+ def getTopicPartitionsMap() = topicPartitionsMap
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogStats.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogStats.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogStats.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.util.concurrent.atomic.AtomicLong
+
+trait LogStatsMBean {
+ def getName(): String
+ def getSize(): Long
+ def getNumberOfSegments: Int
+ def getCurrentOffset: Long
+ def getNumAppendedMessages: Long
+}
+
+class LogStats(val log: Log) extends LogStatsMBean {
+ private val numCumulatedMessages = new AtomicLong(0)
+
+ def getName(): String = log.name
+
+ def getSize(): Long = log.size
+
+ def getNumberOfSegments: Int = log.numberOfSegments
+
+ def getCurrentOffset: Long = log.getHighwaterMark
+
+ def getNumAppendedMessages: Long = numCumulatedMessages.get
+
+ def recordAppendedMessages(nMessages: Int) = numCumulatedMessages.getAndAdd(nMessages)
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/log/SegmentList.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/SegmentList.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/SegmentList.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/SegmentList.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.util.concurrent.atomic._
+import reflect._
+import scala.math._
+
+private[log] object SegmentList {
+ val MaxAttempts: Int = 20
+}
+
+/**
+ * A copy-on-write list implementation that provides consistent views. The view() method
+ * provides an immutable sequence representing a consistent state of the list. The user can do
+ * iterative operations on this sequence such as binary search without locking all access to the list.
+ * Even if the range of the underlying list changes no change will be made to the view
+ */
+private[log] class SegmentList[T](seq: Seq[T])(implicit m: ClassManifest[T]) {
+
+ val contents: AtomicReference[Array[T]] = new AtomicReference(seq.toArray)
+
+ /**
+ * Append the given items to the end of the list
+ */
+ def append(ts: T*)(implicit m: ClassManifest[T]) {
+ while(true){
+ val curr = contents.get()
+ val updated = new Array[T](curr.length + ts.length)
+ Array.copy(curr, 0, updated, 0, curr.length)
+ for(i <- 0 until ts.length)
+ updated(curr.length + i) = ts(i)
+ if(contents.compareAndSet(curr, updated))
+ return
+ }
+ }
+
+
+ /**
+ * Delete the first n items from the list
+ */
+ def trunc(newStart: Int): Seq[T] = {
+ if(newStart < 0)
+ throw new IllegalArgumentException("Starting index must be positive.");
+ var deleted: Array[T] = null
+ var done = false
+ while(!done) {
+ val curr = contents.get()
+ val newLength = max(curr.length - newStart, 0)
+ val updated = new Array[T](newLength)
+ Array.copy(curr, min(newStart, curr.length - 1), updated, 0, newLength)
+ if(contents.compareAndSet(curr, updated)) {
+ deleted = new Array[T](newStart)
+ Array.copy(curr, 0, deleted, 0, curr.length - newLength)
+ done = true
+ }
+ }
+ deleted
+ }
+
+ /**
+ * Get a consistent view of the sequence
+ */
+ def view: Array[T] = contents.get()
+
+ /**
+ * Nicer toString method
+ */
+ override def toString(): String = view.toString
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/log/package.html
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/package.html?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/package.html (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/package.html Mon Aug 1 23:41:24 2011
@@ -0,0 +1 @@
+The log management system for Kafka.
\ No newline at end of file
Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,26 @@
+package kafka.message
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+import scala.Math
+
+class ByteBufferBackedInputStream(buffer:ByteBuffer) extends InputStream {
+ override def read():Int = {
+ buffer.hasRemaining match {
+ case true =>
+ (buffer.get() & 0xFF)
+ case false => -1
+ }
+ }
+
+ override def read(bytes:Array[Byte], off:Int, len:Int):Int = {
+ buffer.hasRemaining match {
+ case true =>
+ // Read only what's left
+ val realLen = math.min(len, buffer.remaining())
+ buffer.get(bytes, off, realLen)
+ realLen
+ case false => -1
+ }
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import scala.collection.mutable
+import org.apache.log4j.Logger
+import kafka.common.{InvalidMessageSizeException, ErrorMapping}
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+import kafka.utils.IteratorTemplate
+
+/**
+ * A sequence of messages stored in a byte buffer
+ *
+ * There are two ways to create a ByteBufferMessageSet
+ *
+ * Option 1: From a ByteBuffer which already contains the serialized message set. Consumers will use this method.
+ *
+ * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
+ *
+ */
+class ByteBufferMessageSet(private val buffer: ByteBuffer,
+ private val initialOffset: Long = 0L,
+ private val errorCode: Int = ErrorMapping.NoError) extends MessageSet {
+ private val logger = Logger.getLogger(getClass())
+ private var validByteCount = -1L
+ private var shallowValidByteCount = -1L
+ private var deepValidByteCount = -1L
+
+ def this(compressionCodec: CompressionCodec, messages: Message*) {
+ this(
+ compressionCodec match {
+ case NoCompressionCodec =>
+ val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+ for (message <- messages) {
+ message.serializeTo(buffer)
+ }
+ buffer.rewind
+ buffer
+ case _ =>
+ val message = CompressionUtils.compress(messages, compressionCodec)
+ val buffer = ByteBuffer.allocate(message.serializedSize)
+ message.serializeTo(buffer)
+ buffer.rewind
+ buffer
+ }, 0L, ErrorMapping.NoError)
+ }
+
+ def this(messages: Message*) {
+ this(NoCompressionCodec, messages: _*)
+ }
+
+ def getInitialOffset = initialOffset
+
+ def getBuffer = buffer
+
+ def getErrorCode = errorCode
+
+ def serialized(): ByteBuffer = buffer
+
+ def validBytes: Long = deepValidBytes
+
+ def shallowValidBytes: Long = {
+ if(shallowValidByteCount < 0) {
+ val iter = deepIterator
+ while(iter.hasNext) {
+ val messageAndOffset = iter.next
+ shallowValidByteCount = messageAndOffset.offset
+ }
+ }
+ shallowValidByteCount - initialOffset
+ }
+
+ def deepValidBytes: Long = {
+ if (deepValidByteCount < 0) {
+ val iter = deepIterator
+ while (iter.hasNext)
+ iter.next
+ }
+ deepValidByteCount
+ }
+
+ /** Write the messages in this set to the given channel */
+ def writeTo(channel: WritableByteChannel, offset: Long, size: Long): Long =
+ channel.write(buffer.duplicate)
+
+ override def iterator: Iterator[MessageAndOffset] = deepIterator
+
+ private def deepIterator(): Iterator[MessageAndOffset] = {
+ ErrorMapping.maybeThrowException(errorCode)
+ new IteratorTemplate[MessageAndOffset] {
+ var topIter = buffer.slice()
+ var currValidBytes = initialOffset
+ var innerIter:Iterator[MessageAndOffset] = null
+ var lastMessageSize = 0L
+
+ def innerDone():Boolean = (innerIter==null || !innerIter.hasNext)
+
+ def makeNextOuter: MessageAndOffset = {
+ if (topIter.remaining < 4) {
+ deepValidByteCount = currValidBytes
+ return allDone()
+ }
+ val size = topIter.getInt()
+ lastMessageSize = size
+
+ if(logger.isTraceEnabled) {
+ logger.trace("Remaining bytes in iterator = " + topIter.remaining)
+ logger.trace("size of data = " + size)
+ }
+ if(size < 0 || topIter.remaining < size) {
+ deepValidByteCount = currValidBytes
+ if (currValidBytes == 0 || size < 0)
+ throw new InvalidMessageSizeException("invalid message size: %d only received bytes: %d " +
+ " at %d possible causes (1) a single message larger than the fetch size; (2) log corruption "
+ .format(size, topIter.remaining, currValidBytes))
+ return allDone()
+ }
+ val message = topIter.slice()
+ message.limit(size)
+ topIter.position(topIter.position + size)
+ val newMessage = new Message(message)
+ newMessage.compressionCodec match {
+ case NoCompressionCodec =>
+ if(logger.isDebugEnabled)
+ logger.debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
+ innerIter = null
+ currValidBytes += 4 + size
+ new MessageAndOffset(newMessage, currValidBytes)
+ case _ =>
+ if(logger.isDebugEnabled)
+ logger.debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
+ innerIter = CompressionUtils.decompress(newMessage).deepIterator
+ makeNext()
+ }
+ }
+
+ override def makeNext(): MessageAndOffset = {
+ if(logger.isDebugEnabled)
+ logger.debug("makeNext() in deepIterator: innerDone = " + innerDone)
+ innerDone match {
+ case true => makeNextOuter
+ case false => {
+ val messageAndOffset = innerIter.next
+ if(!innerIter.hasNext)
+ currValidBytes += 4 + lastMessageSize
+ new MessageAndOffset(messageAndOffset.message, currValidBytes)
+ }
+ }
+ }
+ }
+ }
+
+ def sizeInBytes: Long = buffer.limit
+
+ override def toString: String = {
+ val builder = new StringBuilder()
+ builder.append("ByteBufferMessageSet(")
+ for(message <- this) {
+ builder.append(message)
+ builder.append(", ")
+ }
+ builder.append(")")
+ builder.toString
+ }
+
+ override def equals(other: Any): Boolean = {
+ other match {
+ case that: ByteBufferMessageSet =>
+ (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+ case _ => false
+ }
+ }
+
+ override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
+
+ override def hashCode: Int = 31 + (17 * errorCode) + buffer.hashCode + initialOffset.hashCode
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.message
+
+object CompressionCodec {
+ def getCompressionCodec(codec: Int): CompressionCodec = {
+ codec match {
+ case 0 => NoCompressionCodec
+ case 1 => GZIPCompressionCodec
+ case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec))
+ }
+ }
+}
+
+sealed trait CompressionCodec { def codec: Int }
+
+case object DefaultCompressionCodec extends CompressionCodec { val codec = 1 }
+
+case object GZIPCompressionCodec extends CompressionCodec { val codec = 1 }
+
+case object NoCompressionCodec extends CompressionCodec { val codec = 0 }
Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.io.ByteArrayOutputStream
+import java.io.IOException
+import java.io.InputStream
+import java.util.zip.GZIPInputStream
+import java.util.zip.GZIPOutputStream
+import java.nio.ByteBuffer
+import org.apache.log4j.Logger
+
+object CompressionUtils {
+ private val logger = Logger.getLogger(getClass)
+
+ def compress(messages: Iterable[Message]): Message = compress(messages, DefaultCompressionCodec)
+
+ def compress(messages: Iterable[Message], compressionCodec: CompressionCodec):Message = compressionCodec match {
+ case DefaultCompressionCodec =>
+ val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream()
+ val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream)
+ if(logger.isDebugEnabled)
+ logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
+
+ val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+ messages.foreach(m => m.serializeTo(messageByteBuffer))
+ messageByteBuffer.rewind
+
+ try {
+ gzipOutput.write(messageByteBuffer.array)
+ } catch {
+ case e: IOException => logger.error("Error while writing to the GZIP output stream", e)
+ if(gzipOutput != null) gzipOutput.close();
+ if(outputStream != null) outputStream.close()
+ throw e
+ } finally {
+ if(gzipOutput != null) gzipOutput.close()
+ if(outputStream != null) outputStream.close()
+ }
+
+ val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
+ oneCompressedMessage
+ case GZIPCompressionCodec =>
+ val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream()
+ val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream)
+ if(logger.isDebugEnabled)
+ logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
+
+ val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+ messages.foreach(m => m.serializeTo(messageByteBuffer))
+ messageByteBuffer.rewind
+
+ try {
+ gzipOutput.write(messageByteBuffer.array)
+ } catch {
+ case e: IOException => logger.error("Error while writing to the GZIP output stream", e)
+ if(gzipOutput != null)
+ gzipOutput.close()
+ if(outputStream != null)
+ outputStream.close()
+ throw e
+ } finally {
+ if(gzipOutput != null)
+ gzipOutput.close()
+ if(outputStream != null)
+ outputStream.close()
+ }
+
+ val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
+ oneCompressedMessage
+ case _ =>
+ throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
+ }
+
+ def decompress(message: Message): ByteBufferMessageSet = message.compressionCodec match {
+ case DefaultCompressionCodec =>
+ val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream
+ val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
+ val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
+ val intermediateBuffer = new Array[Byte](1024)
+
+ try {
+ Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
+ outputStream.write(intermediateBuffer, 0, dataRead)
+ }
+ }catch {
+ case e: IOException => logger.error("Error while reading from the GZIP input stream", e)
+ if(gzipIn != null) gzipIn.close
+ if(outputStream != null) outputStream.close
+ throw e
+ } finally {
+ if(gzipIn != null) gzipIn.close
+ if(outputStream != null) outputStream.close
+ }
+
+ val outputBuffer = ByteBuffer.allocate(outputStream.size)
+ outputBuffer.put(outputStream.toByteArray)
+ outputBuffer.rewind
+ val outputByteArray = outputStream.toByteArray
+ new ByteBufferMessageSet(outputBuffer)
+ case GZIPCompressionCodec =>
+ val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream
+ val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
+ val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
+ val intermediateBuffer = new Array[Byte](1024)
+
+ try {
+ Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
+ outputStream.write(intermediateBuffer, 0, dataRead)
+ }
+ }catch {
+ case e: IOException => logger.error("Error while reading from the GZIP input stream", e)
+ if(gzipIn != null) gzipIn.close
+ if(outputStream != null) outputStream.close
+ throw e
+ } finally {
+ if(gzipIn != null) gzipIn.close
+ if(outputStream != null) outputStream.close
+ }
+
+ val outputBuffer = ByteBuffer.allocate(outputStream.size)
+ outputBuffer.put(outputStream.toByteArray)
+ outputBuffer.rewind
+ val outputByteArray = outputStream.toByteArray
+ new ByteBufferMessageSet(outputBuffer)
+ case _ =>
+ throw new kafka.common.UnknownCodecException("Unknown Codec: " + message.compressionCodec)
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,285 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.io._
+import java.nio._
+import java.nio.channels._
+import java.util.concurrent.atomic._
+import org.apache.log4j.Logger
+
+import kafka._
+import kafka.message._
+import kafka.utils._
+
+/**
+ * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
+ * will fail on an immutable message set. An optional limit and offset can be applied to the message set
+ * which will control the offset into the file and the effective length into the file from which
+ * messages will be read
+ */
+@nonthreadsafe
+class FileMessageSet private[kafka](private[message] val channel: FileChannel,
+ private[message] val offset: Long,
+ private[message] val limit: Long,
+ val mutable: Boolean,
+ val needRecover: AtomicBoolean) extends MessageSet {
+
+ private val setSize = new AtomicLong()
+ private val setHighWaterMark = new AtomicLong()
+ private val logger = Logger.getLogger(classOf[FileMessageSet])
+
+ if(mutable) {
+ if(limit < Long.MaxValue || offset > 0)
+ throw new IllegalArgumentException("Attempt to open a mutable message set with a view or offset, which is not allowed.")
+
+ if (needRecover.get) {
+ // set the file position to the end of the file for appending messages
+ val startMs = System.currentTimeMillis
+ val truncated = recover()
+ logger.info("Recovery succeeded in " + (System.currentTimeMillis - startMs) / 1000 +
+ " seconds. " + truncated + " bytes truncated.")
+ }
+ else {
+ setSize.set(channel.size())
+ setHighWaterMark.set(sizeInBytes)
+ channel.position(channel.size)
+ }
+ } else {
+ setSize.set(scala.math.min(channel.size(), limit) - offset)
+ setHighWaterMark.set(sizeInBytes)
+ if(logger.isDebugEnabled)
+ logger.debug("initializing high water mark in immutable mode: " + highWaterMark)
+ }
+
+ /**
+ * Create a file message set with no limit or offset
+ */
+ def this(channel: FileChannel, mutable: Boolean) =
+ this(channel, 0, Long.MaxValue, mutable, new AtomicBoolean(false))
+
+ /**
+ * Create a file message set with no limit or offset
+ */
+ def this(file: File, mutable: Boolean) =
+ this(Utils.openChannel(file, mutable), mutable)
+
+ /**
+ * Create a file message set with no limit or offset
+ */
+ def this(channel: FileChannel, mutable: Boolean, needRecover: AtomicBoolean) =
+ this(channel, 0, Long.MaxValue, mutable, needRecover)
+
+ /**
+ * Create a file message set with no limit or offset
+ */
+ def this(file: File, mutable: Boolean, needRecover: AtomicBoolean) =
+ this(Utils.openChannel(file, mutable), mutable, needRecover)
+
+
+ /**
+ * Return a message set which is a view into this set starting from the given offset and with the given size limit.
+ */
+ def read(readOffset: Long, size: Long): MessageSet = {
+ new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, highWaterMark),
+ false, new AtomicBoolean(false))
+ }
+
+ /**
+ * Write some of this set to the given channel, return the ammount written
+ */
+ def writeTo(destChannel: WritableByteChannel, writeOffset: Long, size: Long): Long =
+ channel.transferTo(offset + writeOffset, scala.math.min(size, sizeInBytes), destChannel)
+
+ /**
+ * Get an iterator over the messages in the set
+ */
+ override def iterator: Iterator[MessageAndOffset] = {
+ new IteratorTemplate[MessageAndOffset] {
+ var location = offset
+
+ override def makeNext(): MessageAndOffset = {
+ // read the size of the item
+ val sizeBuffer = ByteBuffer.allocate(4)
+ channel.read(sizeBuffer, location)
+ if(sizeBuffer.hasRemaining)
+ return allDone()
+
+ sizeBuffer.rewind()
+ val size: Int = sizeBuffer.getInt()
+ if (size < Message.MinHeaderSize)
+ return allDone()
+
+ // read the item itself
+ val buffer = ByteBuffer.allocate(size)
+ channel.read(buffer, location + 4)
+ if(buffer.hasRemaining)
+ return allDone()
+ buffer.rewind()
+
+ // increment the location and return the item
+ location += size + 4
+ new MessageAndOffset(new Message(buffer), location)
+ }
+ }
+ }
+
+ /**
+ * The number of bytes taken up by this file set
+ */
+ def sizeInBytes(): Long = setSize.get()
+
+ /**
+ * The high water mark
+ */
+ def highWaterMark(): Long = setHighWaterMark.get()
+
+ def checkMutable(): Unit = {
+ if(!mutable)
+ throw new IllegalStateException("Attempt to invoke mutation on immutable message set.")
+ }
+
+ /**
+ * Append this message to the message set
+ */
+ def append(messages: MessageSet): Unit = {
+ checkMutable()
+ var written = 0L
+ while(written < messages.sizeInBytes)
+ written += messages.writeTo(channel, 0, messages.sizeInBytes)
+ setSize.getAndAdd(written)
+ }
+
+ /**
+ * Commit all written data to the physical disk
+ */
+ def flush() = {
+ checkMutable()
+ val startTime = SystemTime.milliseconds
+ channel.force(true)
+ val elapsedTime = SystemTime.milliseconds - startTime
+ LogFlushStats.recordFlushRequest(elapsedTime)
+ if (logger.isDebugEnabled)
+ logger.debug("flush time " + elapsedTime)
+ setHighWaterMark.set(sizeInBytes)
+ if(logger.isDebugEnabled)
+ logger.debug("flush high water mark:" + highWaterMark)
+ }
+
+ /**
+ * Close this message set
+ */
+ def close() = {
+ if(mutable)
+ flush()
+ channel.close()
+ }
+
+ /**
+ * Recover log up to the last complete entry. Truncate off any bytes from any incomplete messages written
+ */
+ def recover(): Long = {
+ checkMutable()
+ val len = channel.size
+ val buffer = ByteBuffer.allocate(4)
+ var validUpTo: Long = 0
+ var next = 0L
+ do {
+ next = validateMessage(channel, validUpTo, len, buffer)
+ if(next >= 0)
+ validUpTo = next
+ } while(next >= 0)
+ channel.truncate(validUpTo)
+ setSize.set(validUpTo)
+ setHighWaterMark.set(validUpTo)
+ if(logger.isDebugEnabled)
+ logger.info("recover high water mark:" + highWaterMark)
+ /* This should not be necessary, but fixes bug 6191269 on some OSs. */
+ channel.position(validUpTo)
+ needRecover.set(false)
+ len - validUpTo
+ }
+
+ /**
+ * Read, validate, and discard a single message, returning the next valid offset, and
+ * the message being validated
+ */
+ private def validateMessage(channel: FileChannel, start: Long, len: Long, buffer: ByteBuffer): Long = {
+ buffer.rewind()
+ var read = channel.read(buffer, start)
+ if(read < 4)
+ return -1
+
+ // check that we have sufficient bytes left in the file
+ val size = buffer.getInt(0)
+ if (size < Message.MinHeaderSize)
+ return -1
+
+ val next = start + 4 + size
+ if(next > len)
+ return -1
+
+ // read the message
+ val messageBuffer = ByteBuffer.allocate(size)
+ var curr = start + 4
+ while(messageBuffer.hasRemaining) {
+ read = channel.read(messageBuffer, curr)
+ if(read < 0)
+ throw new IllegalStateException("File size changed during recovery!")
+ else
+ curr += read
+ }
+ messageBuffer.rewind()
+ val message = new Message(messageBuffer)
+ if(!message.isValid)
+ return -1
+ else
+ next
+ }
+
+}
+
+trait LogFlushStatsMBean {
+ def getFlushesPerSecond: Double
+ def getAvgFlushMs: Double
+ def getMaxFlushMs: Double
+ def getNumFlushes: Long
+}
+
+@threadsafe
+class LogFlushStats extends LogFlushStatsMBean {
+ private val flushRequestStats = new SnapshotStats
+
+ def recordFlushRequest(requestMs: Long) = flushRequestStats.recordRequestMetric(requestMs)
+
+ def getFlushesPerSecond: Double = flushRequestStats.getRequestsPerSecond
+
+ def getAvgFlushMs: Double = flushRequestStats.getAvgMetric
+
+ def getMaxFlushMs: Double = flushRequestStats.getMaxMetric
+
+ def getNumFlushes: Long = flushRequestStats.getNumRequests
+}
+
+object LogFlushStats {
+ private val logger = Logger.getLogger(getClass())
+ private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats"
+ private val stats = new LogFlushStats
+ Utils.swallow(logger.warn, Utils.registerMBean(stats, LogFlushStatsMBeanName))
+
+ def recordFlushRequest(requestMs: Long) = stats.recordFlushRequest(requestMs)
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+/**
+ * Indicates that a message failed its checksum and is corrupt
+ */
+class InvalidMessageException extends RuntimeException
Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.nio._
+import java.nio.channels._
+import java.util.zip.CRC32
+import java.util.UUID
+import kafka.utils._
+import kafka.common.UnknownMagicByteException
+
+/**
+ * Message byte offsets
+ */
+object Message {
+ val MagicVersion1: Byte = 0
+ val MagicVersion2: Byte = 1
+ val CurrentMagicValue: Byte = 1
+ val MagicOffset = 0
+ val MagicLength = 1
+ val AttributeOffset = MagicOffset + MagicLength
+ val AttributeLength = 1
+ /**
+ * Specifies the mask for the compression code. 2 bits to hold the compression codec.
+ * 0 is reserved to indicate no compression
+ */
+ val CompressionCodeMask: Int = 0x03 //
+
+
+ val NoCompression:Int = 0
+
+ /**
+ * Computes the CRC value based on the magic byte
+ * @param magic Specifies the magic byte value. Possible values are 0 and 1
+ * 0 for no compression
+ * 1 for compression
+ */
+ def crcOffset(magic: Byte): Int = magic match {
+ case MagicVersion1 => MagicOffset + MagicLength
+ case MagicVersion2 => AttributeOffset + AttributeLength
+ case _ => throw new UnknownMagicByteException("Magic byte value of %d is unknown".format(magic))
+ }
+
+ val CrcLength = 4
+
+ /**
+ * Computes the offset to the message payload based on the magic byte
+ * @param magic Specifies the magic byte value. Possible values are 0 and 1
+ * 0 for no compression
+ * 1 for compression
+ */
+ def payloadOffset(magic: Byte): Int = crcOffset(magic) + CrcLength
+
+ /**
+ * Computes the size of the message header based on the magic byte
+ * @param magic Specifies the magic byte value. Possible values are 0 and 1
+ * 0 for no compression
+ * 1 for compression
+ */
+ def headerSize(magic: Byte): Int = payloadOffset(magic)
+
+ /**
+ * Size of the header for magic byte 0. This is the minimum size of any message header
+ */
+ val MinHeaderSize = headerSize(0);
+}
+
+/**
+ * A message. The format of an N byte message is the following:
+ *
+ * If magic byte is 0
+ *
+ * 1. 1 byte "magic" identifier to allow format changes
+ *
+ * 2. 4 byte CRC32 of the payload
+ *
+ * 3. N - 5 byte payload
+ *
+ * If magic byte is 1
+ *
+ * 1. 1 byte "magic" identifier to allow format changes
+ *
+ * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
+ *
+ * 3. 4 byte CRC32 of the payload
+ *
+ * 4. N - 6 byte payload
+ *
+ */
+class Message(val buffer: ByteBuffer) {
+
+ import kafka.message.Message._
+
+
+ private def this(checksum: Long, bytes: Array[Byte], compressionCodec: CompressionCodec) = {
+ this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + bytes.length))
+ buffer.put(CurrentMagicValue)
+ var attributes:Byte = 0
+ if (compressionCodec.codec > 0) {
+ attributes = (attributes | (Message.CompressionCodeMask & compressionCodec.codec)).toByte
+ }
+ buffer.put(attributes)
+ Utils.putUnsignedInt(buffer, checksum)
+ buffer.put(bytes)
+ buffer.rewind()
+ }
+
+ def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, NoCompressionCodec)
+
+ def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = {
+ //Note: we're not crc-ing the attributes header, so we're susceptible to bit-flipping there
+ this(Utils.crc32(bytes), bytes, compressionCodec)
+ }
+
+ def this(bytes: Array[Byte]) = this(bytes, NoCompressionCodec)
+
+ def size: Int = buffer.limit
+
+ def payloadSize: Int = size - headerSize(magic)
+
+ def magic: Byte = buffer.get(MagicOffset)
+
+ def attributes: Byte = buffer.get(AttributeOffset)
+
+ def compressionCodec:CompressionCodec = {
+ magic match {
+ case 0 => NoCompressionCodec
+ case 1 => CompressionCodec.getCompressionCodec(buffer.get(AttributeOffset) & CompressionCodeMask)
+ case _ => throw new RuntimeException("Invalid magic byte " + magic)
+ }
+
+ }
+
+ def checksum: Long = Utils.getUnsignedInt(buffer, crcOffset(magic))
+
+ def payload: ByteBuffer = {
+ var payload = buffer.duplicate
+ payload.position(headerSize(magic))
+ payload = payload.slice()
+ payload.limit(payloadSize)
+ payload.rewind()
+ payload
+ }
+
+ def isValid: Boolean =
+ checksum == Utils.crc32(buffer.array, buffer.position + buffer.arrayOffset + payloadOffset(magic), payloadSize)
+
+ def serializedSize: Int = 4 /* int size*/ + buffer.limit
+
+ def serializeTo(serBuffer:ByteBuffer) = {
+ serBuffer.putInt(buffer.limit)
+ serBuffer.put(buffer.duplicate)
+ }
+
+ override def toString(): String =
+ "message(magic = %d, attributes = %d, crc = %d, payload = %s)".format(magic, attributes, checksum, payload)
+
+ override def equals(any: Any): Boolean = {
+ any match {
+ case that: Message => size == that.size && attributes == that.attributes && checksum == that.checksum &&
+ payload == that.payload && magic == that.magic
+ case _ => false
+ }
+ }
+
+ override def hashCode(): Int = buffer.hashCode
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.message
+
+/**
+ * Represents message and offset of the next message. This is used in the MessageSet to iterate over it
+ */
+case class MessageAndOffset(val message: Message, val offset: Long)
Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageLengthException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageLengthException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageLengthException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageLengthException.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+/**
+ * Indicates the presense of a message that exceeds the maximum acceptable
+ * length (whatever that happens to be)
+ */
+class MessageLengthException(message: String) extends RuntimeException(message)
Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.nio._
+import java.nio.channels._
+
+/**
+ * Message set helper functions
+ */
+object MessageSet {
+
+ val LogOverhead = 4
+ val Empty: MessageSet = new ByteBufferMessageSet(ByteBuffer.allocate(0))
+
+ /**
+ * The size of a message set containing the given messages
+ */
+ def messageSetSize(messages: Iterable[Message]): Int =
+ messages.foldLeft(0)(_ + entrySize(_))
+
+ /**
+ * The size of a list of messages
+ */
+ def messageSetSize(messages: java.util.List[Message]): Int = {
+ var size = 0
+ val iter = messages.iterator
+ while(iter.hasNext) {
+ val message = iter.next.asInstanceOf[Message]
+ size += entrySize(message)
+ }
+ size
+ }
+
+ /**
+ * The size of a size-delimited entry in a message set
+ */
+ def entrySize(message: Message): Int = LogOverhead + message.size
+
+}
+
+/**
+ * A set of messages. A message set has a fixed serialized form, though the container
+ * for the bytes could be either in-memory or on disk. A The format of each message is
+ * as follows:
+ * 4 byte size containing an integer N
+ * N message bytes as described in the message class
+ */
+abstract class MessageSet extends Iterable[MessageAndOffset] {
+
+ /** Write the messages in this set to the given channel starting at the given offset byte.
+ * Less than the complete amount may be written, but no more than maxSize can be. The number
+ * of bytes written is returned */
+ def writeTo(channel: WritableByteChannel, offset: Long, maxSize: Long): Long
+
+ /**
+ * Provides an iterator over the messages in this set
+ */
+ def iterator: Iterator[MessageAndOffset]
+
+ /**
+ * Gives the total size of this message set in bytes
+ */
+ def sizeInBytes: Long
+
+ /**
+ * Validate the checksum of all the messages in the set. Throws an InvalidMessageException if the checksum doesn't
+ * match the payload for any message.
+ */
+ def validate(): Unit = {
+ for(messageAndOffset <- this)
+ if(!messageAndOffset.message.isValid)
+ throw new InvalidMessageException
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/package.html
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/package.html?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/package.html (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/package.html Mon Aug 1 23:41:24 2011
@@ -0,0 +1 @@
+Messages and everything related to them.
\ No newline at end of file
Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.nio._
+import java.nio.channels._
+import kafka.utils._
+
+/**
+ * Represents a communication between the client and server
+ *
+ */
+@nonthreadsafe
+private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive {
+
+ private val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4)
+ private var contentBuffer: ByteBuffer = null
+
+ def this() = this(Int.MaxValue)
+
+ var complete: Boolean = false
+
+ /**
+ * Get the content buffer for this transmission
+ */
+ def buffer: ByteBuffer = {
+ expectComplete()
+ contentBuffer
+ }
+
+ /**
+ * Read the bytes in this response from the given channel
+ */
+ def readFrom(channel: ReadableByteChannel): Int = {
+ expectIncomplete()
+ var read = 0
+ // have we read the request size yet?
+ if(sizeBuffer.remaining > 0)
+ read += Utils.read(channel, sizeBuffer)
+ // have we allocated the request buffer yet?
+ if(contentBuffer == null && !sizeBuffer.hasRemaining) {
+ sizeBuffer.rewind()
+ val size = sizeBuffer.getInt()
+ if(size <= 0 || size > maxSize)
+ throw new InvalidRequestException(size + " is not a valid message size")
+ contentBuffer = byteBufferAllocate(size)
+ }
+ // if we have a buffer read some stuff into it
+ if(contentBuffer != null) {
+ read = Utils.read(channel, contentBuffer)
+ // did we get everything?
+ if(!contentBuffer.hasRemaining) {
+ contentBuffer.rewind()
+ complete = true
+ }
+ }
+ read
+ }
+
+ private def byteBufferAllocate(size: Int): ByteBuffer = {
+ var buffer: ByteBuffer = null
+ try {
+ buffer = ByteBuffer.allocate(size)
+ }
+ catch {
+ case e: OutOfMemoryError =>
+ throw new RuntimeException("OOME with size " + size, e)
+ case e2 =>
+ throw e2
+ }
+ buffer
+ }
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.nio._
+import java.nio.channels._
+import kafka.utils._
+
+@nonthreadsafe
+private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send {
+
+ private var sizeBuffer = ByteBuffer.allocate(4)
+
+ sizeBuffer.putInt(buffer.limit)
+ sizeBuffer.rewind()
+
+ var complete: Boolean = false
+
+ def this(size: Int) = this(ByteBuffer.allocate(size))
+
+ def this(request: Request) = {
+ this(request.sizeInBytes + 2)
+ buffer.putShort(request.id)
+ request.writeTo(buffer)
+ buffer.rewind()
+ }
+
+ def writeTo(channel: WritableByteChannel): Int = {
+ expectIncomplete()
+ var written = 0
+ // try to write the size if we haven't already
+ if(sizeBuffer.hasRemaining)
+ written += channel.write(sizeBuffer)
+ // try to write the actual buffer itself
+ if(!sizeBuffer.hasRemaining && buffer.hasRemaining)
+ written += channel.write(buffer)
+ // if we are done, mark it off
+ if(!buffer.hasRemaining)
+ complete = true
+
+ written
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.nio._
+import java.nio.channels._
+import kafka.utils._
+
+@nonthreadsafe
+private[kafka] class ByteBufferSend(val buffer: ByteBuffer) extends Send {
+
+ var complete: Boolean = false
+
+ def this(size: Int) = this(ByteBuffer.allocate(size))
+
+ def writeTo(channel: WritableByteChannel): Int = {
+ expectIncomplete()
+ var written = 0
+ written += channel.write(buffer)
+ if(!buffer.hasRemaining)
+ complete = true
+ written
+ }
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/ConnectionConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/ConnectionConfig.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/ConnectionConfig.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/ConnectionConfig.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+trait ConnectionConfig {
+ val host: String
+ val port: Int
+ val sendBufferSize: Int = -1
+ val receiveBufferSize: Int = -1
+ val tcpNoDelay = true
+ val keepAlive = false
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/Handler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/Handler.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/Handler.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/Handler.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+private[kafka] object Handler {
+
+ /**
+ * A request handler is a function that turns an incoming
+ * transmission into an outgoing transmission
+ */
+ type Handler = Receive => Option[Send]
+
+ /**
+ * A handler mapping finds the right Handler function for a given request
+ */
+ type HandlerMapping = (Short, Receive) => Handler
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/InvalidRequestException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/InvalidRequestException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/InvalidRequestException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/InvalidRequestException.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+class InvalidRequestException(val message: String) extends RuntimeException(message) {
+
+ def this() = this("")
+
+}
Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/Request.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/Request.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/Request.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/Request.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.nio._
+
+private[kafka] abstract class Request(val id: Short) {
+
+ def sizeInBytes: Int
+
+ def writeTo(buffer: ByteBuffer): Unit
+
+}