You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/02 01:42:17 UTC

svn commit: r1152970 [17/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...

Added: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,361 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.concurrent.atomic._
+import java.text.NumberFormat
+import java.io._
+import java.nio.channels.FileChannel
+import org.apache.log4j._
+import kafka.message._
+import kafka.utils._
+import kafka.common._
+import kafka.api.OffsetRequest
+import java.util._
+
+private[log] object Log {
+  val FileSuffix = ".kafka"
+
+  /**
+   * Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges
+   * but instead of checking for equality looks within the range. Takes the array size as an option in case
+   * the array grows while searching happens
+   *
+   * TODO: This should move into SegmentList.scala
+   */
+  def findRange[T <: Range](ranges: Array[T], value: Long, arraySize: Int): Option[T] = {
+    if(ranges.size < 1)
+      return None
+
+    // check out of bounds
+    if(value < ranges(0).start || value > ranges(arraySize - 1).start + ranges(arraySize - 1).size)
+      throw new OffsetOutOfRangeException("offset " + value + " is out of range")
+
+    // check at the end
+    if (value == ranges(arraySize - 1).start + ranges(arraySize - 1).size)
+      return None
+
+    var low = 0
+    var high = arraySize - 1
+    while(low <= high) {
+      val mid = (high + low) / 2
+      val found = ranges(mid)
+      if(found.contains(value))
+        return Some(found)
+      else if (value < found.start)
+        high = mid - 1
+      else
+        low = mid + 1
+    }
+    None
+  }
+
+  def findRange[T <: Range](ranges: Array[T], value: Long): Option[T] =
+    findRange(ranges, value, ranges.length)
+
+  /**
+   * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
+   * so that ls sorts the files numerically
+   */
+  def nameFromOffset(offset: Long): String = {
+    val nf = NumberFormat.getInstance()
+    nf.setMinimumIntegerDigits(20)
+    nf.setMaximumFractionDigits(0)
+    nf.setGroupingUsed(false)
+    nf.format(offset) + Log.FileSuffix
+  }
+}
+
+/**
+ * A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size 
+ */
+private[log] class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
+  @volatile var deleted = false
+  def size: Long = messageSet.highWaterMark
+  override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
+}
+
+
+/**
+ * An append-only log for storing messages. 
+ */
+@threadsafe
+private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) {
+
+  private val logger = Logger.getLogger(classOf[Log])
+
+  /* A lock that guards all modifications to the log */
+  private val lock = new Object
+
+  /* The current number of unflushed messages appended to the write */
+  private val unflushed = new AtomicInteger(0)
+
+   /* last time it was flushed */
+  private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
+
+  /* The actual segments of the log */
+  private[log] val segments: SegmentList[LogSegment] = loadSegments()
+
+  /* The name of this log */
+  val name  = dir.getName()
+
+  private val logStats = new LogStats(this)
+
+  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)  
+
+  /* Load the log segments from the log files on disk */
+  private def loadSegments(): SegmentList[LogSegment] = {
+    // open all the segments read-only
+    val accum = new ArrayList[LogSegment]
+    val ls = dir.listFiles()
+    if(ls != null) {
+      for(file <- ls if file.isFile && file.toString.endsWith(Log.FileSuffix)) {
+        if(!file.canRead)
+          throw new IOException("Could not read file " + file)
+        val filename = file.getName()
+        val start = filename.substring(0, filename.length - Log.FileSuffix.length).toLong
+        val messageSet = new FileMessageSet(file, false)
+        accum.add(new LogSegment(file, messageSet, start))
+      }
+    }
+
+    if(accum.size == 0) {
+      // no existing segments, create a new mutable segment
+      val newFile = new File(dir, Log.nameFromOffset(0))
+      val set = new FileMessageSet(newFile, true)
+      accum.add(new LogSegment(newFile, set, 0))
+    } else {
+      // there is at least one existing segment, validate and recover them/it
+      // sort segments into ascending order for fast searching
+      Collections.sort(accum, new Comparator[LogSegment] {
+        def compare(s1: LogSegment, s2: LogSegment): Int = {
+          if(s1.start == s2.start) 0
+          else if(s1.start < s2.start) -1
+          else 1
+        }
+      })
+      validateSegments(accum)
+
+      //make the final section mutable and run recovery on it if necessary
+      val last = accum.remove(accum.size - 1)
+      last.messageSet.close()
+      logger.info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
+      val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
+      accum.add(mutable)
+    }
+    new SegmentList(accum.toArray(new Array[LogSegment](accum.size)))
+  }
+
+  /**
+   * Check that the ranges and sizes add up, otherwise we have lost some data somewhere
+   */
+  private def validateSegments(segments: ArrayList[LogSegment]) {
+    lock synchronized {
+      for(i <- 0 until segments.size - 1) {
+        val curr = segments.get(i)
+        val next = segments.get(i+1)
+        if(curr.start + curr.size != next.start)
+          throw new IllegalStateException("The following segments don't validate: " +
+                  curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
+      }
+    }
+  }
+
+  /**
+   * The number of segments in the log
+   */
+  def numberOfSegments: Int = segments.view.length
+
+  /**
+   * Close this log
+   */
+  def close() {
+    lock synchronized {
+      for(seg <- segments.view)
+        seg.messageSet.close()
+    }
+  }
+
+  /**
+   * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
+   * Returns the offset at which the messages are written.
+   */
+  def append(messages: MessageSet): Unit = {
+    // validate the messages
+    var numberOfMessages = 0
+    for(messageAndOffset <- messages) {
+      if(!messageAndOffset.message.isValid)
+        throw new InvalidMessageException()
+      numberOfMessages += 1;
+    }
+
+    logStats.recordAppendedMessages(numberOfMessages)
+    
+    // they are valid, insert them in the log
+    lock synchronized {
+      val segment = segments.view.last
+      segment.messageSet.append(messages)
+      maybeFlush(numberOfMessages)
+      maybeRoll(segment)
+    }
+  }
+
+  /**
+   * Read from the log file at the given offset
+   */
+  def read(offset: Long, length: Int): MessageSet = {
+    val view = segments.view
+    Log.findRange(view, offset, view.length) match {
+      case Some(segment) => segment.messageSet.read((offset - segment.start), length)
+      case _ => MessageSet.Empty
+    }
+  }
+
+  /**
+   * Delete any log segments matching the given predicate function
+   */
+  def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = {
+    lock synchronized {
+      val view = segments.view
+      val deletable = view.takeWhile(predicate)
+      for(seg <- deletable)
+        seg.deleted = true
+      val numToDelete = deletable.size
+      // if we are deleting everything, create a new empty segment
+      if(numToDelete == view.size)
+        roll()
+      segments.trunc(numToDelete)
+    }
+  }
+
+  /**
+   * Get the size of the log in bytes
+   */
+  def size: Long =
+    segments.view.foldLeft(0L)(_ + _.size)
+
+  /**
+   * The byte offset of the message that will be appended next.
+   */
+  def nextAppendOffset: Long = {
+    flush
+    val last = segments.view.last
+    last.start + last.size
+  }
+
+  /**
+   *  get the current high watermark of the log
+   */
+  def getHighwaterMark: Long = segments.view.last.messageSet.highWaterMark
+
+  /**
+   * Roll the log over if necessary
+   */
+  private def maybeRoll(segment: LogSegment) {
+    if(segment.messageSet.sizeInBytes > maxSize)
+      roll()
+  }
+
+  /**
+   * Create a new segment and make it active
+   */
+  def roll() {
+    lock synchronized {
+      val last = segments.view.last
+      val newOffset = nextAppendOffset
+      val newFile = new File(dir, Log.nameFromOffset(newOffset))
+      if(logger.isDebugEnabled)
+        logger.debug("Rolling log '" + name + "' to " + newFile.getName())
+      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
+    }
+  }
+
+  /**
+   * Flush the log if necessary
+   */
+  private def maybeFlush(numberOfMessages : Int) {
+    if(unflushed.addAndGet(numberOfMessages) >= flushInterval) {
+      flush()
+    }
+  }
+
+  /**
+   * Flush this log file to the physical disk
+   */
+  def flush() = {
+    lock synchronized {
+      if(logger.isDebugEnabled)
+        logger.debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
+          System.currentTimeMillis)
+      segments.view.last.messageSet.flush()
+      unflushed.set(0)
+      lastflushedTime.set(System.currentTimeMillis)
+     }
+  }
+
+  def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
+    val segsArray = segments.view
+    var offsetTimeArray: Array[Tuple2[Long, Long]] = null
+    if (segsArray.last.size > 0)
+      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
+    else
+      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
+
+    for (i <- 0 until segsArray.length)
+      offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified)
+    if (segsArray.last.size > 0)
+      offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
+
+    var startIndex = -1
+    request.time match {
+      case OffsetRequest.LatestTime =>
+        startIndex = offsetTimeArray.length - 1
+      case OffsetRequest.EarliestTime =>
+        startIndex = 0
+      case _ =>
+          var isFound = false
+          if(logger.isDebugEnabled) {
+            logger.debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
+          }
+          startIndex = offsetTimeArray.length - 1
+          while (startIndex >= 0 && !isFound) {
+            if (offsetTimeArray(startIndex)._2 <= request.time)
+              isFound = true
+            else
+              startIndex -=1
+          }
+    }
+
+    val retSize = request.maxNumOffsets.min(startIndex + 1)
+    val ret = new Array[Long](retSize)
+    for (j <- 0 until retSize) {
+      ret(j) = offsetTimeArray(startIndex)._1
+      startIndex -= 1
+    }
+    ret
+  }
+
+  def getTopicName():String = {
+    name.substring(0, name.lastIndexOf("-"))
+  }
+
+  def getLastFlushedTime():Long = {
+    return lastflushedTime.get
+  }
+}
+  

Added: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io._
+import org.apache.log4j.Logger
+import kafka.utils._
+import scala.actors.Actor
+import scala.collection._
+import java.util.concurrent.CountDownLatch
+import kafka.server.{KafkaConfig, KafkaZooKeeper}
+import kafka.common.{InvalidTopicException, InvalidPartitionException}
+
+/**
+ * The guy who creates and hands out logs
+ */
+@threadsafe
+private[kafka] class LogManager(val config: KafkaConfig,
+                                private val scheduler: KafkaScheduler,
+                                private val time: Time,
+                                val logCleanupIntervalMs: Long,
+                                val logCleanupDefaultAgeMs: Long,
+                                needRecovery: Boolean) {
+  
+  val logDir: File = new File(config.logDir)
+  private val numPartitions = config.numPartitions
+  private val maxSize: Long = config.logFileSize
+  private val flushInterval = config.flushInterval
+  private val topicPartitionsMap = config.topicPartitionsMap
+  private val logger = Logger.getLogger(classOf[LogManager])
+  private val logCreationLock = new Object
+  private val random = new java.util.Random
+  private var kafkaZookeeper: KafkaZooKeeper = null
+  private var zkActor: Actor = null
+  private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
+  private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
+  private val logFlushIntervalMap = config.flushIntervalMap
+  private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
+
+  /* Initialize a log for each subdirectory of the main log directory */
+  private val logs = new Pool[String, Pool[Int, Log]]()
+  if(!logDir.exists()) {
+    logger.info("No log directory found, creating '" + logDir.getAbsolutePath() + "'")
+    logDir.mkdirs()
+  }
+  if(!logDir.isDirectory() || !logDir.canRead())
+    throw new IllegalArgumentException(logDir.getAbsolutePath() + " is not a readable log directory.")
+  val subDirs = logDir.listFiles()
+  if(subDirs != null) {
+    for(dir <- subDirs) {
+      if(!dir.isDirectory()) {
+        logger.warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
+      } else {
+        logger.info("Loading log '" + dir.getName() + "'")
+        val log = new Log(dir, maxSize, flushInterval, needRecovery)
+        val topicPartion = Utils.getTopicPartition(dir.getName)
+        logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
+        val parts = logs.get(topicPartion._1)
+        parts.put(topicPartion._2, log)
+      }
+    }
+  }
+  
+  /* Schedule the cleanup task to delete old logs */
+  if(scheduler != null) {
+    logger.info("starting log cleaner every " + logCleanupIntervalMs + " ms")    
+    scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
+  }
+
+  if(config.enableZookeeper) {
+    kafkaZookeeper = new KafkaZooKeeper(config, this)
+    kafkaZookeeper.startup
+    zkActor = new Actor {
+      def act() {
+        loop {
+          receive {
+            case topic: String =>
+              try {
+                kafkaZookeeper.registerTopicInZk(topic)
+              }
+              catch {
+                case e => logger.error(e) // log it and let it go
+              }
+            case StopActor =>
+              logger.info("zkActor stopped")
+              exit
+          }
+        }
+      }
+    }
+    zkActor.start
+  }
+
+  case object StopActor
+
+  private def getLogRetentionMSMap(logRetentionHourMap: Map[String, Int]) : Map[String, Long] = {
+    var ret = new mutable.HashMap[String, Long]
+    for ( (topic, hour) <- logRetentionHourMap )
+      ret.put(topic, hour * 60 * 60 * 1000L)
+    ret
+  }
+
+  /**
+   *  Register this broker in ZK for the first time.
+   */
+  def startup() {
+    if(config.enableZookeeper) {
+      kafkaZookeeper.registerBrokerInZk()
+      for (topic <- getAllTopics)
+        kafkaZookeeper.registerTopicInZk(topic)
+      startupLatch.countDown
+    }
+    logger.info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap)
+    logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
+  }
+
+  private def awaitStartup() {
+    if (config.enableZookeeper)
+      startupLatch.await
+  }
+
+  def registerNewTopicInZK(topic: String) {
+    if (config.enableZookeeper)
+      zkActor ! topic 
+  }
+
+  /**
+   * Create a log for the given topic and the given partition
+   */
+  private def createLog(topic: String, partition: Int): Log = {
+    logCreationLock synchronized {
+      val d = new File(logDir, topic + "-" + partition)
+      d.mkdirs()
+      new Log(d, maxSize, flushInterval, false)
+    }
+  }
+  
+
+  def chooseRandomPartition(topic: String): Int = {
+    random.nextInt(topicPartitionsMap.getOrElse(topic, numPartitions))
+  }
+
+  /**
+   * Create the log if it does not exist, if it exists just return it
+   */
+  def getOrCreateLog(topic: String, partition: Int): Log = {
+    awaitStartup
+    if (topic.length <= 0)
+      throw new InvalidTopicException("topic name can't be empty")
+    if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) {
+      logger.warn("Wrong partition " + partition + " valid partitions (0," +
+              (topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")")
+      throw new InvalidPartitionException("wrong partition " + partition)
+    }
+    var hasNewTopic = false
+    var parts = logs.get(topic)
+    if (parts == null) {
+      val found = logs.putIfNotExists(topic, new Pool[Int, Log])
+      if (found == null)
+        hasNewTopic = true
+      parts = logs.get(topic)
+    }
+    var log = parts.get(partition)
+    if(log == null) {
+      log = createLog(topic, partition)
+      val found = parts.putIfNotExists(partition, log)
+      if(found != null) {
+        // there was already somebody there
+        log.close()
+        log = found
+      }
+      else
+        logger.info("Created log for '" + topic + "'-" + partition)
+    }
+
+    if (hasNewTopic)
+      registerNewTopicInZK(topic)
+    log
+  }
+  
+  /**
+   * Delete any eligible logs. Return the number of segments deleted.
+   */
+  def cleanupLogs() {
+    logger.debug("Beginning log cleanup...")
+    val iter = getLogIterator
+    var total = 0
+    val startMs = time.milliseconds
+    while(iter.hasNext) {
+      val log = iter.next
+      logger.debug("Garbage collecting '" + log.name + "'")
+      var logCleanupThresholdMS = this.logCleanupDefaultAgeMs
+      val topic = Utils.getTopicPartition(log.dir.getName)._1
+      if (logRetentionMSMap.contains(topic))
+        logCleanupThresholdMS = logRetentionMSMap(topic)
+      val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
+      for(segment <- toBeDeleted) {
+        logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name)
+        Utils.swallow(logger.warn, segment.messageSet.close())
+        if(!segment.file.delete())
+          logger.warn("Delete failed.")
+        else
+          total += 1
+      }
+    }
+    logger.debug("Log cleanup completed. " + total + " files deleted in " + 
+                 (time.milliseconds - startMs) / 1000 + " seconds")
+  }
+  
+  /**
+   * Close all the logs
+   */
+  def close() {
+    logFlusherScheduler.shutdown
+    val iter = getLogIterator
+    while(iter.hasNext)
+      iter.next.close()
+    if (config.enableZookeeper) {
+      zkActor ! StopActor
+      kafkaZookeeper.close
+    }
+  }
+  
+  private def getLogIterator(): Iterator[Log] = {
+    new IteratorTemplate[Log] {
+      val partsIter = logs.values.iterator
+      var logIter: Iterator[Log] = null
+
+      override def makeNext(): Log = {
+        while (true) {
+          if (logIter != null && logIter.hasNext)
+            return logIter.next
+          if (!partsIter.hasNext)
+            return allDone
+          logIter = partsIter.next.values.iterator
+        }
+        // should never reach here
+        assert(false)
+        return allDone
+      }
+    }
+  }
+
+  private def flushAllLogs() = {
+    if (logger.isDebugEnabled)
+      logger.debug("flushing the high watermark of all logs")
+
+    for (log <- getLogIterator)
+    {
+      try{
+        val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
+        var logFlushInterval = config.defaultFlushIntervalMs
+        if(logFlushIntervalMap.contains(log.getTopicName))
+          logFlushInterval = logFlushIntervalMap(log.getTopicName)
+        if (logger.isDebugEnabled)
+          logger.debug(log.getTopicName + " flush interval  " + logFlushInterval +
+            " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
+        if(timeSinceLastFlush >= logFlushInterval)
+          log.flush
+      }
+      catch {
+        case e =>
+          logger.error("error flushing " + log.getTopicName, e)
+          e match {
+            case _: IOException =>
+              logger.error("force shutdown due to error in flushAllLogs" + e)
+              Runtime.getRuntime.halt(1)
+            case _ =>
+          }
+      }
+    }     
+  }
+
+
+  def getAllTopics(): Iterator[String] = logs.keys.iterator
+  def getTopicPartitionsMap() = topicPartitionsMap
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogStats.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogStats.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogStats.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.util.concurrent.atomic.AtomicLong
+
+trait LogStatsMBean {
+  def getName(): String
+  def getSize(): Long
+  def getNumberOfSegments: Int
+  def getCurrentOffset: Long
+  def getNumAppendedMessages: Long
+}
+
+class LogStats(val log: Log) extends LogStatsMBean {
+  private val numCumulatedMessages = new AtomicLong(0)
+
+  def getName(): String = log.name
+  
+  def getSize(): Long = log.size
+  
+  def getNumberOfSegments: Int = log.numberOfSegments
+  
+  def getCurrentOffset: Long = log.getHighwaterMark
+  
+  def getNumAppendedMessages: Long = numCumulatedMessages.get
+
+  def recordAppendedMessages(nMessages: Int) = numCumulatedMessages.getAndAdd(nMessages)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/log/SegmentList.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/SegmentList.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/SegmentList.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/SegmentList.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.util.concurrent.atomic._
+import reflect._
+import scala.math._
+
+private[log] object SegmentList {
+  val MaxAttempts: Int = 20
+}
+
+/**
+ * A copy-on-write list implementation that provides consistent views. The view() method
+ * provides an immutable sequence representing a consistent state of the list. The user can do
+ * iterative operations on this sequence such as binary search without locking all access to the list.
+ * Even if the range of the underlying list changes no change will be made to the view
+ */
+private[log] class SegmentList[T](seq: Seq[T])(implicit m: ClassManifest[T]) {
+  
+  val contents: AtomicReference[Array[T]] = new AtomicReference(seq.toArray)
+
+  /**
+   * Append the given items to the end of the list
+   */
+  def append(ts: T*)(implicit m: ClassManifest[T]) {
+    while(true){
+      val curr = contents.get()
+      val updated = new Array[T](curr.length + ts.length)
+      Array.copy(curr, 0, updated, 0, curr.length)
+      for(i <- 0 until ts.length)
+        updated(curr.length + i) = ts(i)
+      if(contents.compareAndSet(curr, updated))
+        return
+    }
+  }
+  
+  
+  /**
+   * Delete the first n items from the list
+   */
+  def trunc(newStart: Int): Seq[T] = {
+    if(newStart < 0)
+      throw new IllegalArgumentException("Starting index must be positive.");
+    var deleted: Array[T] = null
+    var done = false
+    while(!done) {
+      val curr = contents.get()
+      val newLength = max(curr.length - newStart, 0)
+      val updated = new Array[T](newLength)
+      Array.copy(curr, min(newStart, curr.length - 1), updated, 0, newLength)
+      if(contents.compareAndSet(curr, updated)) {
+        deleted = new Array[T](newStart)
+        Array.copy(curr, 0, deleted, 0, curr.length - newLength)
+        done = true
+      }
+    }
+    deleted
+  }
+  
+  /**
+   * Get a consistent view of the sequence
+   */
+  def view: Array[T] = contents.get()
+  
+  /**
+   * Nicer toString method
+   */
+  override def toString(): String = view.toString
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/log/package.html
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/package.html?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/package.html (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/package.html Mon Aug  1 23:41:24 2011
@@ -0,0 +1 @@
+The log management system for Kafka.
\ No newline at end of file

Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,26 @@
+package kafka.message
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+import scala.Math
+
+class ByteBufferBackedInputStream(buffer:ByteBuffer) extends InputStream {
+  override def read():Int  = {
+    buffer.hasRemaining match {
+      case true =>
+        (buffer.get() & 0xFF)
+      case false => -1
+    }
+  }
+
+  override def read(bytes:Array[Byte], off:Int, len:Int):Int = {
+    buffer.hasRemaining match {
+      case true =>
+        // Read only what's left
+        val realLen = math.min(len, buffer.remaining())
+        buffer.get(bytes, off, realLen)
+        realLen
+      case false => -1
+    }
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import scala.collection.mutable
+import org.apache.log4j.Logger
+import kafka.common.{InvalidMessageSizeException, ErrorMapping}
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+import kafka.utils.IteratorTemplate
+
+/**
+ * A sequence of messages stored in a byte buffer
+ *
+ * There are two ways to create a ByteBufferMessageSet
+ *
+ * Option 1: From a ByteBuffer which already contains the serialized message set. Consumers will use this method.
+ *
+ * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
+ * 
+ */
+class ByteBufferMessageSet(private val buffer: ByteBuffer,
+                           private val initialOffset: Long = 0L,
+                           private val errorCode: Int = ErrorMapping.NoError) extends MessageSet {
+  private val logger = Logger.getLogger(getClass())  
+  private var validByteCount = -1L
+  private var shallowValidByteCount = -1L
+  private var deepValidByteCount = -1L
+
+  def this(compressionCodec: CompressionCodec, messages: Message*) {
+    this(
+      compressionCodec match {
+        case NoCompressionCodec =>
+          val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+          for (message <- messages) {
+            message.serializeTo(buffer)
+          }
+          buffer.rewind
+          buffer
+        case _ =>
+          val message = CompressionUtils.compress(messages, compressionCodec)
+          val buffer = ByteBuffer.allocate(message.serializedSize)
+          message.serializeTo(buffer)
+          buffer.rewind
+          buffer
+      }, 0L, ErrorMapping.NoError)
+  }
+
+  def this(messages: Message*) {
+    this(NoCompressionCodec, messages: _*)
+  }
+
+  def getInitialOffset = initialOffset
+
+  def getBuffer = buffer
+
+  def getErrorCode = errorCode
+
+  def serialized(): ByteBuffer = buffer
+
+  def validBytes: Long = deepValidBytes
+  
+  def shallowValidBytes: Long = {
+    if(shallowValidByteCount < 0) {
+      val iter = deepIterator
+      while(iter.hasNext) {
+        val messageAndOffset = iter.next
+        shallowValidByteCount = messageAndOffset.offset
+      }
+    }
+    shallowValidByteCount - initialOffset
+  }
+  
+  def deepValidBytes: Long = {
+    if (deepValidByteCount < 0) {
+      val iter = deepIterator
+      while (iter.hasNext)
+        iter.next
+    }
+    deepValidByteCount
+  }
+
+  /** Write the messages in this set to the given channel */
+  def writeTo(channel: WritableByteChannel, offset: Long, size: Long): Long =
+    channel.write(buffer.duplicate)
+  
+  override def iterator: Iterator[MessageAndOffset] = deepIterator
+
+  private def deepIterator(): Iterator[MessageAndOffset] = {
+    ErrorMapping.maybeThrowException(errorCode)
+    new IteratorTemplate[MessageAndOffset] {
+      var topIter = buffer.slice()
+      var currValidBytes = initialOffset
+      var innerIter:Iterator[MessageAndOffset] = null
+      var lastMessageSize = 0L
+
+      def innerDone():Boolean = (innerIter==null || !innerIter.hasNext)
+
+      def makeNextOuter: MessageAndOffset = {
+        if (topIter.remaining < 4) {
+          deepValidByteCount = currValidBytes
+          return allDone()
+        }
+        val size = topIter.getInt()
+        lastMessageSize = size
+
+        if(logger.isTraceEnabled) {
+          logger.trace("Remaining bytes in iterator = " + topIter.remaining)
+          logger.trace("size of data = " + size)
+        }
+        if(size < 0 || topIter.remaining < size) {
+          deepValidByteCount = currValidBytes
+          if (currValidBytes == 0 || size < 0)
+            throw new InvalidMessageSizeException("invalid message size: %d only received bytes: %d " +
+              " at %d possible causes (1) a single message larger than the fetch size; (2) log corruption "
+                .format(size, topIter.remaining, currValidBytes))
+          return allDone()
+        }
+        val message = topIter.slice()
+        message.limit(size)
+        topIter.position(topIter.position + size)
+        val newMessage = new Message(message)
+        newMessage.compressionCodec match {
+          case NoCompressionCodec =>
+            if(logger.isDebugEnabled)
+              logger.debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
+            innerIter = null
+            currValidBytes += 4 + size
+            new MessageAndOffset(newMessage, currValidBytes)
+          case _ =>
+            if(logger.isDebugEnabled)
+              logger.debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
+            innerIter = CompressionUtils.decompress(newMessage).deepIterator
+            makeNext()
+        }
+      }
+
+      override def makeNext(): MessageAndOffset = {
+        if(logger.isDebugEnabled)
+          logger.debug("makeNext() in deepIterator: innerDone = " + innerDone)
+        innerDone match {
+          case true => makeNextOuter
+          case false => {
+            val messageAndOffset = innerIter.next
+            if(!innerIter.hasNext)
+              currValidBytes += 4 + lastMessageSize
+            new MessageAndOffset(messageAndOffset.message, currValidBytes)
+          }
+        }
+      }
+    }
+  }
+
+  def sizeInBytes: Long = buffer.limit
+  
+  override def toString: String = {
+    val builder = new StringBuilder()
+    builder.append("ByteBufferMessageSet(")
+    for(message <- this) {
+      builder.append(message)
+      builder.append(", ")
+    }
+    builder.append(")")
+    builder.toString
+  }
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case that: ByteBufferMessageSet =>
+        (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+      case _ => false
+    }
+  }
+
+  override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
+
+  override def hashCode: Int = 31 + (17 * errorCode) + buffer.hashCode + initialOffset.hashCode
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionCodec.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.message
+
+object CompressionCodec {
+  def getCompressionCodec(codec: Int): CompressionCodec = {
+    codec match {
+      case 0 => NoCompressionCodec
+      case 1 => GZIPCompressionCodec
+      case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec))
+    }
+  }
+}
+
+sealed trait CompressionCodec { def codec: Int }
+
+case object DefaultCompressionCodec extends CompressionCodec { val codec = 1 }
+
+case object GZIPCompressionCodec extends CompressionCodec { val codec = 1 }
+
+case object NoCompressionCodec extends CompressionCodec { val codec = 0 }

Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.io.ByteArrayOutputStream
+import java.io.IOException
+import java.io.InputStream
+import java.util.zip.GZIPInputStream
+import java.util.zip.GZIPOutputStream
+import java.nio.ByteBuffer
+import org.apache.log4j.Logger
+
+object CompressionUtils {
+  private val logger = Logger.getLogger(getClass)
+
+  def compress(messages: Iterable[Message]): Message = compress(messages, DefaultCompressionCodec)
+
+  def compress(messages: Iterable[Message], compressionCodec: CompressionCodec):Message = compressionCodec match {
+    case DefaultCompressionCodec =>
+      val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream()
+      val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream)
+      if(logger.isDebugEnabled)
+        logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
+
+      val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+      messages.foreach(m => m.serializeTo(messageByteBuffer))
+      messageByteBuffer.rewind
+
+      try {
+        gzipOutput.write(messageByteBuffer.array)
+      } catch {
+        case e: IOException => logger.error("Error while writing to the GZIP output stream", e)
+        if(gzipOutput != null) gzipOutput.close();
+        if(outputStream != null) outputStream.close()
+        throw e
+      } finally {
+        if(gzipOutput != null) gzipOutput.close()
+        if(outputStream != null) outputStream.close()
+      }
+
+      val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
+      oneCompressedMessage
+    case GZIPCompressionCodec =>
+      val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream()
+      val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream)
+      if(logger.isDebugEnabled)
+        logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
+
+      val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+      messages.foreach(m => m.serializeTo(messageByteBuffer))
+      messageByteBuffer.rewind
+
+      try {
+        gzipOutput.write(messageByteBuffer.array)
+      } catch {
+        case e: IOException => logger.error("Error while writing to the GZIP output stream", e)
+        if(gzipOutput != null)
+          gzipOutput.close()
+        if(outputStream != null)
+          outputStream.close()
+        throw e
+      } finally {
+        if(gzipOutput != null)
+          gzipOutput.close()
+        if(outputStream != null)
+          outputStream.close()
+      }
+
+      val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
+      oneCompressedMessage
+    case _ =>
+      throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
+  }
+
+  def decompress(message: Message): ByteBufferMessageSet = message.compressionCodec match {
+    case DefaultCompressionCodec =>
+      val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream
+      val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
+      val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
+      val intermediateBuffer = new Array[Byte](1024)
+
+      try {
+        Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
+          outputStream.write(intermediateBuffer, 0, dataRead)
+        }
+      }catch {
+        case e: IOException => logger.error("Error while reading from the GZIP input stream", e)
+        if(gzipIn != null) gzipIn.close
+        if(outputStream != null) outputStream.close
+        throw e
+      } finally {
+        if(gzipIn != null) gzipIn.close
+        if(outputStream != null) outputStream.close
+      }
+
+      val outputBuffer = ByteBuffer.allocate(outputStream.size)
+      outputBuffer.put(outputStream.toByteArray)
+      outputBuffer.rewind
+      val outputByteArray = outputStream.toByteArray
+      new ByteBufferMessageSet(outputBuffer)
+    case GZIPCompressionCodec =>
+      val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream
+      val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
+      val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
+      val intermediateBuffer = new Array[Byte](1024)
+
+      try {
+        Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
+          outputStream.write(intermediateBuffer, 0, dataRead)
+        }
+      }catch {
+        case e: IOException => logger.error("Error while reading from the GZIP input stream", e)
+        if(gzipIn != null) gzipIn.close
+        if(outputStream != null) outputStream.close
+        throw e
+      } finally {
+        if(gzipIn != null) gzipIn.close
+        if(outputStream != null) outputStream.close
+      }
+
+      val outputBuffer = ByteBuffer.allocate(outputStream.size)
+      outputBuffer.put(outputStream.toByteArray)
+      outputBuffer.rewind
+      val outputByteArray = outputStream.toByteArray
+      new ByteBufferMessageSet(outputBuffer)
+    case _ =>
+      throw new kafka.common.UnknownCodecException("Unknown Codec: " + message.compressionCodec)
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,285 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.io._
+import java.nio._
+import java.nio.channels._
+import java.util.concurrent.atomic._
+import org.apache.log4j.Logger
+
+import kafka._
+import kafka.message._
+import kafka.utils._
+
+/**
+ * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
+ * will fail on an immutable message set. An optional limit and offset can be applied to the message set
+ * which will control the offset into the file and the effective length into the file from which
+ * messages will be read
+ */
+@nonthreadsafe
+class FileMessageSet private[kafka](private[message] val channel: FileChannel,
+                                    private[message] val offset: Long,
+                                    private[message] val limit: Long,
+                                    val mutable: Boolean,
+                                    val needRecover: AtomicBoolean) extends MessageSet {
+  
+  private val setSize = new AtomicLong()
+  private val setHighWaterMark = new AtomicLong()
+  private val logger = Logger.getLogger(classOf[FileMessageSet])
+  
+  if(mutable) {
+    if(limit < Long.MaxValue || offset > 0)
+      throw new IllegalArgumentException("Attempt to open a mutable message set with a view or offset, which is not allowed.")
+
+    if (needRecover.get) {
+      // set the file position to the end of the file for appending messages
+      val startMs = System.currentTimeMillis
+      val truncated = recover()
+      logger.info("Recovery succeeded in " + (System.currentTimeMillis - startMs) / 1000 +
+                " seconds. " + truncated + " bytes truncated.")
+    }
+    else {
+      setSize.set(channel.size())
+      setHighWaterMark.set(sizeInBytes)
+      channel.position(channel.size)
+    }
+  } else {
+    setSize.set(scala.math.min(channel.size(), limit) - offset)
+    setHighWaterMark.set(sizeInBytes)
+    if(logger.isDebugEnabled)
+      logger.debug("initializing high water mark in immutable mode: " + highWaterMark)
+  }
+  
+  /**
+   * Create a file message set with no limit or offset
+   */
+  def this(channel: FileChannel, mutable: Boolean) = 
+    this(channel, 0, Long.MaxValue, mutable, new AtomicBoolean(false))
+  
+  /**
+   * Create a file message set with no limit or offset
+   */
+  def this(file: File, mutable: Boolean) = 
+    this(Utils.openChannel(file, mutable), mutable)
+  
+  /**
+   * Create a file message set with no limit or offset
+   */
+  def this(channel: FileChannel, mutable: Boolean, needRecover: AtomicBoolean) = 
+    this(channel, 0, Long.MaxValue, mutable, needRecover)
+  
+  /**
+   * Create a file message set with no limit or offset
+   */
+  def this(file: File, mutable: Boolean, needRecover: AtomicBoolean) = 
+    this(Utils.openChannel(file, mutable), mutable, needRecover)
+  
+  
+  /**
+   * Return a message set which is a view into this set starting from the given offset and with the given size limit.
+   */
+  def read(readOffset: Long, size: Long): MessageSet = {
+    new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, highWaterMark),
+      false, new AtomicBoolean(false))
+  }
+  
+  /**
+   * Write some of this set to the given channel, return the ammount written
+   */
+  def writeTo(destChannel: WritableByteChannel, writeOffset: Long, size: Long): Long = 
+    channel.transferTo(offset + writeOffset, scala.math.min(size, sizeInBytes), destChannel)
+  
+  /**
+   * Get an iterator over the messages in the set
+   */
+  override def iterator: Iterator[MessageAndOffset] = {
+    new IteratorTemplate[MessageAndOffset] {
+      var location = offset
+      
+      override def makeNext(): MessageAndOffset = {
+        // read the size of the item
+        val sizeBuffer = ByteBuffer.allocate(4)
+        channel.read(sizeBuffer, location)
+        if(sizeBuffer.hasRemaining)
+          return allDone()
+        
+        sizeBuffer.rewind()
+        val size: Int = sizeBuffer.getInt()
+        if (size < Message.MinHeaderSize)
+          return allDone()
+        
+        // read the item itself
+        val buffer = ByteBuffer.allocate(size)
+        channel.read(buffer, location + 4)
+        if(buffer.hasRemaining)
+          return allDone()
+        buffer.rewind()
+        
+        // increment the location and return the item
+        location += size + 4
+        new MessageAndOffset(new Message(buffer), location)
+      }
+    }
+  }
+  
+  /**
+   * The number of bytes taken up by this file set
+   */
+  def sizeInBytes(): Long = setSize.get()
+  
+  /**
+    * The high water mark
+    */
+  def highWaterMark(): Long = setHighWaterMark.get()
+
+  def checkMutable(): Unit = {
+    if(!mutable)
+      throw new IllegalStateException("Attempt to invoke mutation on immutable message set.")
+  }
+  
+  /**
+   * Append this message to the message set
+   */
+  def append(messages: MessageSet): Unit = {
+    checkMutable()
+    var written = 0L
+    while(written < messages.sizeInBytes)
+      written += messages.writeTo(channel, 0, messages.sizeInBytes)
+    setSize.getAndAdd(written)
+  }
+ 
+  /**
+   * Commit all written data to the physical disk
+   */
+  def flush() = {
+    checkMutable()
+    val startTime = SystemTime.milliseconds
+    channel.force(true)
+    val elapsedTime = SystemTime.milliseconds - startTime
+    LogFlushStats.recordFlushRequest(elapsedTime)
+    if (logger.isDebugEnabled)
+      logger.debug("flush time " + elapsedTime)
+    setHighWaterMark.set(sizeInBytes)
+    if(logger.isDebugEnabled)
+      logger.debug("flush high water mark:" + highWaterMark)
+  }
+  
+  /**
+   * Close this message set
+   */
+  def close() = {
+    if(mutable)
+      flush()
+    channel.close()
+  }
+  
+  /**
+   * Recover log up to the last complete entry. Truncate off any bytes from any incomplete messages written
+   */
+  def recover(): Long = {
+    checkMutable()
+    val len = channel.size
+    val buffer = ByteBuffer.allocate(4)
+    var validUpTo: Long = 0
+    var next = 0L
+    do {
+      next = validateMessage(channel, validUpTo, len, buffer)
+      if(next >= 0)
+        validUpTo = next
+    } while(next >= 0)
+    channel.truncate(validUpTo)
+    setSize.set(validUpTo)
+    setHighWaterMark.set(validUpTo)
+    if(logger.isDebugEnabled)
+      logger.info("recover high water mark:" + highWaterMark)
+    /* This should not be necessary, but fixes bug 6191269 on some OSs. */
+    channel.position(validUpTo)
+    needRecover.set(false)    
+    len - validUpTo
+  }
+  
+  /**
+   * Read, validate, and discard a single message, returning the next valid offset, and
+   * the message being validated
+   */
+  private def validateMessage(channel: FileChannel, start: Long, len: Long, buffer: ByteBuffer): Long = {
+    buffer.rewind()
+    var read = channel.read(buffer, start)
+    if(read < 4)
+      return -1
+    
+    // check that we have sufficient bytes left in the file
+    val size = buffer.getInt(0)
+    if (size < Message.MinHeaderSize)
+      return -1
+    
+    val next = start + 4 + size
+    if(next > len)
+      return -1
+    
+    // read the message
+    val messageBuffer = ByteBuffer.allocate(size)
+    var curr = start + 4
+    while(messageBuffer.hasRemaining) {
+      read = channel.read(messageBuffer, curr)
+      if(read < 0)
+        throw new IllegalStateException("File size changed during recovery!")
+      else
+        curr += read
+    }
+    messageBuffer.rewind()
+    val message = new Message(messageBuffer)
+    if(!message.isValid)
+      return -1
+    else
+      next
+  }
+  
+}
+
+trait LogFlushStatsMBean {
+  def getFlushesPerSecond: Double
+  def getAvgFlushMs: Double
+  def getMaxFlushMs: Double
+  def getNumFlushes: Long
+}
+
+@threadsafe
+class LogFlushStats extends LogFlushStatsMBean {
+  private val flushRequestStats = new SnapshotStats
+
+  def recordFlushRequest(requestMs: Long) = flushRequestStats.recordRequestMetric(requestMs)
+
+  def getFlushesPerSecond: Double = flushRequestStats.getRequestsPerSecond
+
+  def getAvgFlushMs: Double = flushRequestStats.getAvgMetric
+
+  def getMaxFlushMs: Double = flushRequestStats.getMaxMetric
+
+  def getNumFlushes: Long = flushRequestStats.getNumRequests
+}
+
+object LogFlushStats {
+  private val logger = Logger.getLogger(getClass())
+  private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats"
+  private val stats = new LogFlushStats
+  Utils.swallow(logger.warn, Utils.registerMBean(stats, LogFlushStatsMBeanName))
+
+  def recordFlushRequest(requestMs: Long) = stats.recordFlushRequest(requestMs)
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/InvalidMessageException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+/**
+ * Indicates that a message failed its checksum and is corrupt
+ */
+class InvalidMessageException extends RuntimeException

Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.nio._
+import java.nio.channels._
+import java.util.zip.CRC32
+import java.util.UUID
+import kafka.utils._
+import kafka.common.UnknownMagicByteException
+
+/**
+ * Message byte offsets
+ */
+object Message {
+  val MagicVersion1: Byte = 0
+  val MagicVersion2: Byte = 1
+  val CurrentMagicValue: Byte = 1
+  val MagicOffset = 0
+  val MagicLength = 1
+  val AttributeOffset = MagicOffset + MagicLength
+  val AttributeLength = 1
+  /**
+   * Specifies the mask for the compression code. 2 bits to hold the compression codec.
+   * 0 is reserved to indicate no compression
+   */
+  val CompressionCodeMask: Int = 0x03  //
+
+
+  val NoCompression:Int = 0
+
+  /**
+   * Computes the CRC value based on the magic byte
+   * @param magic Specifies the magic byte value. Possible values are 0 and 1
+   *              0 for no compression
+   *              1 for compression
+  */
+  def crcOffset(magic: Byte): Int = magic match {
+    case MagicVersion1 => MagicOffset + MagicLength
+    case MagicVersion2 => AttributeOffset + AttributeLength
+    case _ => throw new UnknownMagicByteException("Magic byte value of %d is unknown".format(magic))
+  }
+  
+  val CrcLength = 4
+
+  /**
+   * Computes the offset to the message payload based on the magic byte
+   * @param magic Specifies the magic byte value. Possible values are 0 and 1
+   *              0 for no compression
+   *              1 for compression
+   */
+  def payloadOffset(magic: Byte): Int = crcOffset(magic) + CrcLength
+
+  /**
+   * Computes the size of the message header based on the magic byte
+   * @param magic Specifies the magic byte value. Possible values are 0 and 1
+   *              0 for no compression
+   *              1 for compression
+   */
+  def headerSize(magic: Byte): Int = payloadOffset(magic)
+
+  /**
+   * Size of the header for magic byte 0. This is the minimum size of any message header
+   */
+  val MinHeaderSize = headerSize(0);
+}
+
+/**
+ * A message. The format of an N byte message is the following:
+ *
+ * If magic byte is 0
+ *
+ * 1. 1 byte "magic" identifier to allow format changes
+ *
+ * 2. 4 byte CRC32 of the payload
+ *
+ * 3. N - 5 byte payload
+ *
+ * If magic byte is 1
+ *
+ * 1. 1 byte "magic" identifier to allow format changes
+ *
+ * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
+ *
+ * 3. 4 byte CRC32 of the payload
+ *
+ * 4. N - 6 byte payload
+ * 
+ */
+class Message(val buffer: ByteBuffer) {
+  
+  import kafka.message.Message._
+    
+  
+  private def this(checksum: Long, bytes: Array[Byte], compressionCodec: CompressionCodec) = {
+    this(ByteBuffer.allocate(Message.headerSize(Message.CurrentMagicValue) + bytes.length))
+    buffer.put(CurrentMagicValue)
+    var attributes:Byte = 0
+    if (compressionCodec.codec > 0) {
+      attributes =  (attributes | (Message.CompressionCodeMask & compressionCodec.codec)).toByte
+    }
+    buffer.put(attributes)
+    Utils.putUnsignedInt(buffer, checksum)
+    buffer.put(bytes)
+    buffer.rewind()
+  }
+
+  def this(checksum:Long, bytes:Array[Byte]) = this(checksum, bytes, NoCompressionCodec)
+  
+  def this(bytes: Array[Byte], compressionCodec: CompressionCodec) = {
+    //Note: we're not crc-ing the attributes header, so we're susceptible to bit-flipping there
+    this(Utils.crc32(bytes), bytes, compressionCodec)
+  }
+
+  def this(bytes: Array[Byte]) = this(bytes, NoCompressionCodec)
+  
+  def size: Int = buffer.limit
+  
+  def payloadSize: Int = size - headerSize(magic)
+  
+  def magic: Byte = buffer.get(MagicOffset)
+  
+  def attributes: Byte = buffer.get(AttributeOffset)
+  
+  def compressionCodec:CompressionCodec = {
+    magic match {
+      case 0 => NoCompressionCodec
+      case 1 => CompressionCodec.getCompressionCodec(buffer.get(AttributeOffset) & CompressionCodeMask)
+      case _ => throw new RuntimeException("Invalid magic byte " + magic)
+    }
+
+  }
+
+  def checksum: Long = Utils.getUnsignedInt(buffer, crcOffset(magic))
+  
+  def payload: ByteBuffer = {
+    var payload = buffer.duplicate
+    payload.position(headerSize(magic))
+    payload = payload.slice()
+    payload.limit(payloadSize)
+    payload.rewind()
+    payload
+  }
+  
+  def isValid: Boolean =
+    checksum == Utils.crc32(buffer.array, buffer.position + buffer.arrayOffset + payloadOffset(magic), payloadSize)
+
+  def serializedSize: Int = 4 /* int size*/ + buffer.limit
+   
+  def serializeTo(serBuffer:ByteBuffer) = {
+    serBuffer.putInt(buffer.limit)
+    serBuffer.put(buffer.duplicate)
+  }
+
+  override def toString(): String = 
+    "message(magic = %d, attributes = %d, crc = %d, payload = %s)".format(magic, attributes, checksum, payload)
+  
+  override def equals(any: Any): Boolean = {
+    any match {
+      case that: Message => size == that.size && attributes == that.attributes && checksum == that.checksum &&
+        payload == that.payload && magic == that.magic
+      case _ => false
+    }
+  }
+  
+  override def hashCode(): Int = buffer.hashCode
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageAndOffset.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.message
+
+/**
+ * Represents message and offset of the next message. This is used in the MessageSet to iterate over it
+ */
+case class MessageAndOffset(val message: Message, val offset: Long)

Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageLengthException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageLengthException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageLengthException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageLengthException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+/**
+ * Indicates the presense of a message that exceeds the maximum acceptable 
+ * length (whatever that happens to be)
+ */
+class MessageLengthException(message: String) extends RuntimeException(message)

Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.nio._
+import java.nio.channels._
+
+/**
+ * Message set helper functions
+ */
+object MessageSet {
+  
+  val LogOverhead = 4
+  val Empty: MessageSet = new ByteBufferMessageSet(ByteBuffer.allocate(0))
+  
+  /**
+   * The size of a message set containing the given messages
+   */
+  def messageSetSize(messages: Iterable[Message]): Int =
+    messages.foldLeft(0)(_ + entrySize(_))
+
+  /**
+   * The size of a list of messages
+   */
+  def messageSetSize(messages: java.util.List[Message]): Int = {
+    var size = 0
+    val iter = messages.iterator
+    while(iter.hasNext) {
+      val message = iter.next.asInstanceOf[Message]
+      size += entrySize(message)
+    }
+    size
+  }
+  
+  /**
+   * The size of a size-delimited entry in a message set
+   */
+  def entrySize(message: Message): Int = LogOverhead + message.size
+  
+}
+
+/**
+ * A set of messages. A message set has a fixed serialized form, though the container
+ * for the bytes could be either in-memory or on disk. A The format of each message is
+ * as follows:
+ * 4 byte size containing an integer N
+ * N message bytes as described in the message class
+ */
+abstract class MessageSet extends Iterable[MessageAndOffset] {
+
+  /** Write the messages in this set to the given channel starting at the given offset byte. 
+    * Less than the complete amount may be written, but no more than maxSize can be. The number
+    * of bytes written is returned */
+  def writeTo(channel: WritableByteChannel, offset: Long, maxSize: Long): Long
+  
+  /**
+   * Provides an iterator over the messages in this set
+   */
+  def iterator: Iterator[MessageAndOffset]
+  
+  /**
+   * Gives the total size of this message set in bytes
+   */
+  def sizeInBytes: Long
+  
+  /**
+   * Validate the checksum of all the messages in the set. Throws an InvalidMessageException if the checksum doesn't
+   * match the payload for any message.
+   */
+  def validate(): Unit = {
+    for(messageAndOffset <- this)
+      if(!messageAndOffset.message.isValid)
+        throw new InvalidMessageException
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/message/package.html
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/package.html?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/package.html (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/package.html Mon Aug  1 23:41:24 2011
@@ -0,0 +1 @@
+Messages and everything related to them.
\ No newline at end of file

Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.nio._
+import java.nio.channels._
+import kafka.utils._
+
+/**
+ * Represents a communication between the client and server
+ * 
+ */
+@nonthreadsafe
+private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive {
+  
+  private val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4)
+  private var contentBuffer: ByteBuffer = null
+  
+  def this() = this(Int.MaxValue)
+  
+  var complete: Boolean = false
+  
+  /**
+   * Get the content buffer for this transmission
+   */
+  def buffer: ByteBuffer = {
+    expectComplete()
+    contentBuffer
+  }
+  
+  /**
+   * Read the bytes in this response from the given channel
+   */
+  def readFrom(channel: ReadableByteChannel): Int = {
+    expectIncomplete()
+    var read = 0
+    // have we read the request size yet?
+    if(sizeBuffer.remaining > 0)
+      read += Utils.read(channel, sizeBuffer)
+    // have we allocated the request buffer yet?
+    if(contentBuffer == null && !sizeBuffer.hasRemaining) {
+      sizeBuffer.rewind()
+      val size = sizeBuffer.getInt()
+      if(size <= 0 || size > maxSize)
+        throw new InvalidRequestException(size + " is not a valid message size")
+      contentBuffer = byteBufferAllocate(size)
+    }
+    // if we have a buffer read some stuff into it
+    if(contentBuffer != null) {
+      read = Utils.read(channel, contentBuffer)
+      // did we get everything?
+      if(!contentBuffer.hasRemaining) {
+        contentBuffer.rewind()
+        complete = true
+      }
+    }
+    read
+  }
+
+  private def byteBufferAllocate(size: Int): ByteBuffer = {
+    var buffer: ByteBuffer = null
+    try {
+      buffer = ByteBuffer.allocate(size)
+    }
+    catch {
+      case e: OutOfMemoryError =>
+        throw new RuntimeException("OOME with size " + size, e)
+      case e2 =>
+        throw e2
+    }
+    buffer
+  }
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.nio._
+import java.nio.channels._
+import kafka.utils._
+
+@nonthreadsafe
+private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send {
+  
+  private var sizeBuffer = ByteBuffer.allocate(4)
+  
+  sizeBuffer.putInt(buffer.limit)
+  sizeBuffer.rewind()
+  
+  var complete: Boolean = false
+
+  def this(size: Int) = this(ByteBuffer.allocate(size))
+  
+  def this(request: Request) = {
+    this(request.sizeInBytes + 2)
+    buffer.putShort(request.id)
+    request.writeTo(buffer)
+    buffer.rewind()
+  }
+  
+  def writeTo(channel: WritableByteChannel): Int = {
+    expectIncomplete()
+    var written = 0
+    // try to write the size if we haven't already
+    if(sizeBuffer.hasRemaining)
+      written += channel.write(sizeBuffer)
+    // try to write the actual buffer itself
+    if(!sizeBuffer.hasRemaining && buffer.hasRemaining)
+      written += channel.write(buffer)
+    // if we are done, mark it off
+    if(!buffer.hasRemaining)
+      complete = true
+    
+    written
+  }
+    
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.nio._
+import java.nio.channels._
+import kafka.utils._
+
+@nonthreadsafe
+private[kafka] class ByteBufferSend(val buffer: ByteBuffer) extends Send {
+  
+  var complete: Boolean = false
+
+  def this(size: Int) = this(ByteBuffer.allocate(size))
+  
+  def writeTo(channel: WritableByteChannel): Int = {
+    expectIncomplete()
+    var written = 0
+    written += channel.write(buffer)
+    if(!buffer.hasRemaining)
+      complete = true
+    written
+  }
+    
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/ConnectionConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/ConnectionConfig.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/ConnectionConfig.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/ConnectionConfig.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+trait ConnectionConfig {
+  val host: String
+  val port: Int
+  val sendBufferSize: Int = -1
+  val receiveBufferSize: Int = -1
+  val tcpNoDelay = true
+  val keepAlive = false  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/Handler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/Handler.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/Handler.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/Handler.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+private[kafka] object Handler {
+  
+  /**
+   * A request handler is a function that turns an incoming 
+   * transmission into an outgoing transmission
+   */
+  type Handler = Receive => Option[Send]
+  
+  /**
+   * A handler mapping finds the right Handler function for a given request
+   */
+  type HandlerMapping = (Short, Receive) => Handler
+
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/InvalidRequestException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/InvalidRequestException.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/InvalidRequestException.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/InvalidRequestException.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+class InvalidRequestException(val message: String) extends RuntimeException(message) {
+  
+  def this() = this("")
+  
+}

Added: incubator/kafka/trunk/core/src/main/scala/kafka/network/Request.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/Request.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/Request.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/Request.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.nio._
+
+private[kafka] abstract class Request(val id: Short) {
+
+  def sizeInBytes: Int
+  
+  def writeTo(buffer: ByteBuffer): Unit
+  
+}