You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/10 22:47:24 UTC

svn commit: r1383105 - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/ core/src/main/scala/kafka/javaapi/message/ c...

Author: junrao
Date: Mon Sep 10 20:47:23 2012
New Revision: 1383105

URL: http://svn.apache.org/viewvc?rev=1383105&view=rev
Log:
remove errorcode from ByteBufferMessageSet; patched by Swapnil Ghike; reviewed by Jay Kreps and Jun Rao; KAFKA-458

Modified:
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Mon Sep 10 20:47:23 2012
@@ -24,7 +24,6 @@ import java.util.Iterator;
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.api.OffsetRequest;
-import kafka.common.ErrorMapping;
 import kafka.javaapi.FetchResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
@@ -136,7 +135,6 @@ public class KafkaETLContext {
 
             while ( !gotNext && _respIterator.hasNext()) {
                 ByteBufferMessageSet msgSet = _respIterator.next();
-                if ( hasError(msgSet)) return false;
                 _messageIt = msgSet.iterator();
                 gotNext = get(key, value);
             }
@@ -249,36 +247,6 @@ public class KafkaETLContext {
         return range;
     }
     
-    /**
-     * Called by the default implementation of {@link #map} to check error code
-     * to determine whether to continue.
-     */
-    protected boolean hasError(ByteBufferMessageSet messages)
-            throws IOException {
-        short errorCode = messages.getErrorCode();
-        if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
-            /* offset cannot cross the maximum offset (guaranteed by Kafka protocol).
-               Kafka server may delete old files from time to time */
-            System.err.println("WARNING: current offset=" + _offset + ". It is out of range.");
-
-            if (_retry >= MAX_RETRY_TIME)  return true;
-            _retry++;
-            // get the current offset range
-            _offsetRange = getOffsetRange();
-            _offset =  _offsetRange[0];
-            return false;
-        } else if (errorCode == ErrorMapping.InvalidMessageCode()) {
-            throw new IOException(_input + " current offset=" + _offset
-                    + " : invalid offset.");
-        } else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
-            throw new IOException(_input + " : wrong partition");
-        } else if (errorCode != ErrorMapping.NoError()) {
-            throw new IOException(_input + " current offset=" + _offset
-                    + " error:" + errorCode);
-        } else
-            return false;
-    }
-    
     public static int getClientBufferSize(Props props) throws Exception {
         return props.getInt(CLIENT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Mon Sep 10 20:47:23 2012
@@ -34,7 +34,7 @@ object PartitionData {
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
+    new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset))
   }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Mon Sep 10 20:47:23 2012
@@ -76,9 +76,9 @@ case class ProducerRequest( versionId: S
       buffer.putInt(topicData.partitionDataArray.size) //the number of partitions
       for(partitionData <- topicData.partitionDataArray) {
         buffer.putInt(partitionData.partition)
-        buffer.putInt(partitionData.messages.getSerialized().limit)
-        buffer.put(partitionData.messages.getSerialized())
-        partitionData.messages.getSerialized().rewind
+        buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit)
+        buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer)
+        partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind
       }
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Mon Sep 10 20:47:23 2012
@@ -64,15 +64,6 @@ private[consumer] class PartitionTopicIn
     }
   }
 
-  /**
-   *  add an empty message with the exception to the queue so that client can see the error
-   */
-  def enqueueError(e: Throwable, fetchOffset: Long) = {
-    val messages = new ByteBufferMessageSet(buffer = ErrorMapping.EmptyByteBuffer, initialOffset = 0,
-      errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-    chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
-  }
-
   override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
     ": consumed offset = " + consumedOffset.get
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala Mon Sep 10 20:47:23 2012
@@ -24,8 +24,7 @@ private[javaapi] object Implicits extend
 
   implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
      kafka.javaapi.message.ByteBufferMessageSet = {
-    new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset,
-                                                   messageSet.getErrorCode)
+    new kafka.javaapi.message.ByteBufferMessageSet(messageSet.buffer, messageSet.initialOffset)
   }
 
   implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse =

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala Mon Sep 10 20:47:23 2012
@@ -17,20 +17,14 @@
 package kafka.javaapi.message
 
 import java.nio.ByteBuffer
-import kafka.common.ErrorMapping
 import kafka.message._
 
-class ByteBufferMessageSet(private val buffer: ByteBuffer,
-                           private val initialOffset: Long = 0L,
-                           private val errorCode: Short = ErrorMapping.NoError) extends MessageSet {
-  val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
-                                                                                              initialOffset,
-                                                                                              errorCode)
-  def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)
+class ByteBufferMessageSet(private val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet {
+  val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer, initialOffset)
+  def this(buffer: ByteBuffer) = this(buffer, 0L)
 
   def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
-    this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*),
-         0L, ErrorMapping.NoError)
+    this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*), 0L)
   }
 
   def this(messages: java.util.List[Message]) {
@@ -39,14 +33,6 @@ class ByteBufferMessageSet(private val b
 
   def validBytes: Long = underlying.validBytes
 
-  def serialized():ByteBuffer = underlying.getSerialized()
-
-  def getInitialOffset = initialOffset
-
-  def getBuffer = buffer
-
-  def getErrorCode = errorCode
-
   override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] {
     val underlyingIterator = underlying.iterator
     override def hasNext(): Boolean = {
@@ -67,13 +53,13 @@ class ByteBufferMessageSet(private val b
   override def equals(other: Any): Boolean = {
     other match {
       case that: ByteBufferMessageSet =>
-        (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+        (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset
       case _ => false
     }
   }
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
 
-  override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode + initialOffset.hashCode
+  override def hashCode: Int = underlying.hashCode
 
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Mon Sep 10 20:47:23 2012
@@ -247,7 +247,7 @@ private[kafka] class Log( val dir: File,
     logStats.recordAppendedMessages(numberOfMessages)
 
     // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
-    val validByteBuffer = messages.getBuffer.duplicate()
+    val validByteBuffer = messages.buffer.duplicate()
     val messageSetValidBytes = messages.validBytes
     if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
       throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Mon Sep 10 20:47:23 2012
@@ -21,7 +21,7 @@ import kafka.utils.Logging
 import java.nio.ByteBuffer
 import java.nio.channels._
 import kafka.utils.IteratorTemplate
-import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException, ErrorMapping}
+import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException}
 
 /**
  * A sequence of messages stored in a byte buffer
@@ -33,29 +33,19 @@ import kafka.common.{MessageSizeTooLarge
  * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
  * 
  */
-class ByteBufferMessageSet(private val buffer: ByteBuffer,
-                           private val initialOffset: Long = 0L,
-                           private val errorCode: Short = ErrorMapping.NoError) extends MessageSet with Logging {
+class ByteBufferMessageSet(val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet with Logging {
   private var shallowValidByteCount = -1L
   if(sizeInBytes > Int.MaxValue)
     throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
-    this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
+    this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L)
   }
 
   def this(messages: Message*) {
     this(NoCompressionCodec, messages: _*)
   }
 
-  def getInitialOffset = initialOffset
-
-  def getBuffer = buffer
-
-  def getErrorCode = errorCode
-
-  def getSerialized(): ByteBuffer = buffer
-
   def validBytes: Long = shallowValidBytes
 
   private def shallowValidBytes: Long = {
@@ -96,7 +86,6 @@ class ByteBufferMessageSet(private val b
 
   /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
   private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
-    ErrorMapping.maybeThrowException(errorCode)
     new IteratorTemplate[MessageAndOffset] {
       var topIter = buffer.slice()
       var currValidBytes = initialOffset
@@ -192,12 +181,18 @@ class ByteBufferMessageSet(private val b
   override def equals(other: Any): Boolean = {
     other match {
       case that: ByteBufferMessageSet =>
-        (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+        (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset
       case _ => false
     }
   }
 
   override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
 
-  override def hashCode: Int = 31 + (17 * errorCode) + buffer.hashCode + initialOffset.hashCode
+  override def hashCode: Int = {
+    var hash = 17
+    hash = hash * 31 + buffer.hashCode
+    hash = hash * 31 + initialOffset.hashCode
+    hash
+  }
+
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Mon Sep 10 20:47:23 2012
@@ -40,8 +40,6 @@ class FileMessageSet private[kafka](priv
   
   private val setSize = new AtomicLong()
 
-  def getSerialized(): ByteBuffer = throw new java.lang.UnsupportedOperationException()
-
   if(mutable) {
     if(limit < Long.MaxValue || offset > 0)
       throw new KafkaException("Attempt to open a mutable message set with a view or offset, which is not allowed.")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala Mon Sep 10 20:47:23 2012
@@ -110,10 +110,5 @@ abstract class MessageSet extends Iterab
       if(!messageAndOffset.message.isValid)
         throw new InvalidMessageException
   }
-  
-  /**
-   * Used to allow children to have serialization on implementation
-   */
-  def getSerialized(): ByteBuffer
 
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Sep 10 20:47:23 2012
@@ -317,15 +317,12 @@ class KafkaApis(val requestChannel: Requ
       val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
       for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
         val isFetchFromFollower = fetchRequest.replicaId != FetchRequest.NonFollowerId
-        val partitionInfo = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower) match {
-          case Left(err) =>
-            BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
-            BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
-            new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
-          case Right((messages, highWatermark)) =>
+        val partitionInfo =
+          try {
+            val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
             BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
             BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
-            if(!isFetchFromFollower) {
+            if (!isFetchFromFollower) {
               new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
             } else {
               debug("Leader %d for topic %s partition %d received fetch request from follower %d"
@@ -334,7 +331,15 @@ class KafkaApis(val requestChannel: Requ
                 .format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
               new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
             }
-        }
+          }
+          catch {
+            case e =>
+              BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
+              BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
+              error("error when processing request " + (topic, partition, offset, fetchSize), e)
+              new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
+                                offset, -1L, MessageSet.Empty)
+          }
         info.append(partitionInfo)
       }
       fetchedData.append(new TopicData(topic, info.toArray))
@@ -346,31 +351,23 @@ class KafkaApis(val requestChannel: Requ
    * Read from a single topic/partition at the given offset upto maxSize bytes
    */
   private def readMessageSet(topic: String, partition: Int, offset: Long,
-                             maxSize: Int, fromFollower: Boolean): Either[Short, (MessageSet, Long)] = {
-    var response: Either[Short, (MessageSet, Long)] = null
-    try {
-      // check if the current broker is the leader for the partitions
-      val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition)
-      trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
-      val actualSize = if (!fromFollower) {
-        min(leader.highWatermark - offset, maxSize).toInt
-      } else {
-        maxSize
-      }
-      val messages = leader.log match {
-        case Some(log) =>
-          log.read(offset, actualSize)
-        case None =>
-          error("Leader for topic %s partition %d on broker %d does not have a local log".format(topic, partition, brokerId))
-          MessageSet.Empty
-      }
-      response = Right(messages, leader.highWatermark)
-    } catch {
-      case e =>
-        error("error when processing request " + (topic, partition, offset, maxSize), e)
-        response = Left(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+                             maxSize: Int, fromFollower: Boolean): (MessageSet, Long) = {
+    // check if the current broker is the leader for the partitions
+    val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition)
+    trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
+    val actualSize = if (!fromFollower) {
+      min(leader.highWatermark - offset, maxSize).toInt
+    } else {
+      maxSize
+    }
+    val messages = leader.log match {
+      case Some(log) =>
+        log.read(offset, actualSize)
+      case None =>
+        error("Leader for topic %s partition %d on broker %d does not have a local log".format(topic, partition, brokerId))
+        MessageSet.Empty
     }
-    response
+    (messages, leader.highWatermark)
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Mon Sep 10 20:47:23 2012
@@ -25,7 +25,7 @@ import org.scalatest.junit.JUnit3Suite
 import scala.collection._
 import kafka.producer.ProducerData
 import kafka.utils.TestUtils
-import kafka.common.{KafkaException, OffsetOutOfRangeException}
+import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException}
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -72,7 +72,7 @@ class LazyInitProducerTest extends JUnit
     // send an invalid offset
     try {
       val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
-      fetchedWithError.messageSet(topic, 0).iterator
+      fetchedWithError.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
       fail("Expected an OffsetOutOfRangeException exception to be thrown")
     } catch {
       case e: OffsetOutOfRangeException => 
@@ -109,9 +109,9 @@ class LazyInitProducerTest extends JUnit
 
       val request = builder.build()
       val responses = consumer.fetch(request)
-      for( (topic, offset) <- topicOffsets ) {
+      for(topicData <- responses.data) {
         try {
-          responses.messageSet(topic, offset).iterator
+          topicData.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
           fail("Expected an OffsetOutOfRangeException exception to be thrown")
         } catch {
           case e: OffsetOutOfRangeException =>

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Mon Sep 10 20:47:23 2012
@@ -189,8 +189,7 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, partition) <- topics)
-          response.messageSet(topic, partition).iterator
+        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
         case e: OffsetOutOfRangeException => "this is good"
@@ -206,9 +205,7 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, partition) <- topics) {
-          response.messageSet(topic, -1).iterator
-        }
+        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case e: UnknownTopicOrPartitionException => "this is good"
@@ -256,8 +253,7 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, partition) <- topics)
-          response.messageSet(topic, partition).iterator
+        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
         case e: OffsetOutOfRangeException => "this is good"
@@ -273,8 +269,7 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, _) <- topics)
-          response.messageSet(topic, -1).iterator
+        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case e: UnknownTopicOrPartitionException => "this is good"

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala Mon Sep 10 20:47:23 2012
@@ -17,7 +17,6 @@
 
 package kafka.javaapi.message
 
-import java.nio._
 import junit.framework.Assert._
 import org.junit.Test
 import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}
@@ -29,34 +28,10 @@ class ByteBufferMessageSetTest extends k
     new ByteBufferMessageSet(compressed, getMessageList(messages: _*))
   
   @Test
-  def testValidBytes() {
+  def testEquals() {
     val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
                                                messages = getMessageList(new Message("hello".getBytes()),
-                                                                      new Message("there".getBytes())))
-    val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
-    buffer.put(messageList.getBuffer)
-    buffer.putShort(4)
-    val messageListPlus = new ByteBufferMessageSet(buffer)
-    assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
-  }
-
-  @Test
-  def testValidBytesWithCompression () {
-    val messageList = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
-                                               messages = getMessageList(new Message("hello".getBytes()),
                                                                          new Message("there".getBytes())))
-    val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
-    buffer.put(messageList.getBuffer)
-    buffer.putShort(4)
-    val messageListPlus = new ByteBufferMessageSet(buffer, 0, 0)
-    assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
-  }
-
-  @Test
-  def testEquals() {
-    val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                            messages = getMessageList(new Message("hello".getBytes()),
-                                                                      new Message("there".getBytes())))
     val moreMessages = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
                                                 messages = getMessageList(new Message("hello".getBytes()),
                                                                           new Message("there".getBytes())))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Mon Sep 10 20:47:23 2012
@@ -33,7 +33,7 @@ class ByteBufferMessageSetTest extends B
     // create a ByteBufferMessageSet that doesn't contain a full message
     // iterating it should get an InvalidMessageSizeException
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes()))
-    val buffer = messages.getSerialized().slice
+    val buffer = messages.buffer.slice
     buffer.limit(10)
     val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000)
     try {
@@ -51,7 +51,7 @@ class ByteBufferMessageSetTest extends B
     {
       val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
       val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
-      buffer.put(messages.getSerialized())
+      buffer.put(messages.buffer)
       buffer.putShort(4)
       val messagesPlus = new ByteBufferMessageSet(buffer)
       assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
@@ -65,6 +65,18 @@ class ByteBufferMessageSetTest extends B
   }
 
   @Test
+  def testValidBytesWithCompression() {
+    {
+      val messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+      val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
+      buffer.put(messages.buffer)
+      buffer.putShort(4)
+      val messagesPlus = new ByteBufferMessageSet(buffer)
+      assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
+    }
+  }
+
+  @Test
   def testEquals() {
     var messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
     var moreMessages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
@@ -93,7 +105,7 @@ class ByteBufferMessageSetTest extends B
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit)
+      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.buffer.limit)
 
       //make sure shallow iterator is the same as deep iterator
       TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator),
@@ -107,7 +119,7 @@ class ByteBufferMessageSetTest extends B
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit)
+      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.buffer.limit)
 
       verifyShallowIterator(messageSet)
     }
@@ -117,16 +129,16 @@ class ByteBufferMessageSetTest extends B
       val emptyMessageList : List[Message] = Nil
       val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*)
       val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
-      val buffer = ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + regularMessgeSet.getSerialized().limit)
-      buffer.put(emptyMessageSet.getSerialized())
-      buffer.put(regularMessgeSet.getSerialized())
+      val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit + regularMessgeSet.buffer.limit)
+      buffer.put(emptyMessageSet.buffer)
+      buffer.put(regularMessgeSet.buffer)
       buffer.rewind
       val mixedMessageSet = new ByteBufferMessageSet(buffer)
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit)
+      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.buffer.limit)
 
       //make sure shallow iterator is the same as deep iterator
       TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator),
@@ -138,16 +150,16 @@ class ByteBufferMessageSetTest extends B
       val emptyMessageList : List[Message] = Nil
       val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*)
       val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
-      val buffer = ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + regularMessgeSet.getSerialized().limit)
-      buffer.put(emptyMessageSet.getSerialized())
-      buffer.put(regularMessgeSet.getSerialized())
+      val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit + regularMessgeSet.buffer.limit)
+      buffer.put(emptyMessageSet.buffer)
+      buffer.put(regularMessgeSet.buffer)
       buffer.rewind
       val mixedMessageSet = new ByteBufferMessageSet(buffer)
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit)
+      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.buffer.limit)
 
       verifyShallowIterator(mixedMessageSet)
     }