You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/10 22:47:24 UTC
svn commit: r1383105 - in /incubator/kafka/branches/0.8:
contrib/hadoop-consumer/src/main/java/kafka/etl/
core/src/main/scala/kafka/api/ core/src/main/scala/kafka/consumer/
core/src/main/scala/kafka/javaapi/
core/src/main/scala/kafka/javaapi/message/ c...
Author: junrao
Date: Mon Sep 10 20:47:23 2012
New Revision: 1383105
URL: http://svn.apache.org/viewvc?rev=1383105&view=rev
Log:
remove errorcode from ByteBufferMessageSet; patched by Swapnil Ghike; reviewed by Jay Kreps and Jun Rao; KAFKA-458
Modified:
incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Mon Sep 10 20:47:23 2012
@@ -24,7 +24,6 @@ import java.util.Iterator;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
-import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
@@ -136,7 +135,6 @@ public class KafkaETLContext {
while ( !gotNext && _respIterator.hasNext()) {
ByteBufferMessageSet msgSet = _respIterator.next();
- if ( hasError(msgSet)) return false;
_messageIt = msgSet.iterator();
gotNext = get(key, value);
}
@@ -249,36 +247,6 @@ public class KafkaETLContext {
return range;
}
- /**
- * Called by the default implementation of {@link #map} to check error code
- * to determine whether to continue.
- */
- protected boolean hasError(ByteBufferMessageSet messages)
- throws IOException {
- short errorCode = messages.getErrorCode();
- if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
- /* offset cannot cross the maximum offset (guaranteed by Kafka protocol).
- Kafka server may delete old files from time to time */
- System.err.println("WARNING: current offset=" + _offset + ". It is out of range.");
-
- if (_retry >= MAX_RETRY_TIME) return true;
- _retry++;
- // get the current offset range
- _offsetRange = getOffsetRange();
- _offset = _offsetRange[0];
- return false;
- } else if (errorCode == ErrorMapping.InvalidMessageCode()) {
- throw new IOException(_input + " current offset=" + _offset
- + " : invalid offset.");
- } else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
- throw new IOException(_input + " : wrong partition");
- } else if (errorCode != ErrorMapping.NoError()) {
- throw new IOException(_input + " current offset=" + _offset
- + " error:" + errorCode);
- } else
- return false;
- }
-
public static int getClientBufferSize(Props props) throws Exception {
return props.getInt(CLIENT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Mon Sep 10 20:47:23 2012
@@ -34,7 +34,7 @@ object PartitionData {
val messageSetBuffer = buffer.slice()
messageSetBuffer.limit(messageSetSize)
buffer.position(buffer.position + messageSetSize)
- new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
+ new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset))
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Mon Sep 10 20:47:23 2012
@@ -76,9 +76,9 @@ case class ProducerRequest( versionId: S
buffer.putInt(topicData.partitionDataArray.size) //the number of partitions
for(partitionData <- topicData.partitionDataArray) {
buffer.putInt(partitionData.partition)
- buffer.putInt(partitionData.messages.getSerialized().limit)
- buffer.put(partitionData.messages.getSerialized())
- partitionData.messages.getSerialized().rewind
+ buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit)
+ buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer)
+ partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind
}
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Mon Sep 10 20:47:23 2012
@@ -64,15 +64,6 @@ private[consumer] class PartitionTopicIn
}
}
- /**
- * add an empty message with the exception to the queue so that client can see the error
- */
- def enqueueError(e: Throwable, fetchOffset: Long) = {
- val messages = new ByteBufferMessageSet(buffer = ErrorMapping.EmptyByteBuffer, initialOffset = 0,
- errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
- chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
- }
-
override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
": consumed offset = " + consumedOffset.get
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala Mon Sep 10 20:47:23 2012
@@ -24,8 +24,7 @@ private[javaapi] object Implicits extend
implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
kafka.javaapi.message.ByteBufferMessageSet = {
- new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset,
- messageSet.getErrorCode)
+ new kafka.javaapi.message.ByteBufferMessageSet(messageSet.buffer, messageSet.initialOffset)
}
implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse =
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala Mon Sep 10 20:47:23 2012
@@ -17,20 +17,14 @@
package kafka.javaapi.message
import java.nio.ByteBuffer
-import kafka.common.ErrorMapping
import kafka.message._
-class ByteBufferMessageSet(private val buffer: ByteBuffer,
- private val initialOffset: Long = 0L,
- private val errorCode: Short = ErrorMapping.NoError) extends MessageSet {
- val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
- initialOffset,
- errorCode)
- def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)
+class ByteBufferMessageSet(private val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet {
+ val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer, initialOffset)
+ def this(buffer: ByteBuffer) = this(buffer, 0L)
def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
- this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*),
- 0L, ErrorMapping.NoError)
+ this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*), 0L)
}
def this(messages: java.util.List[Message]) {
@@ -39,14 +33,6 @@ class ByteBufferMessageSet(private val b
def validBytes: Long = underlying.validBytes
- def serialized():ByteBuffer = underlying.getSerialized()
-
- def getInitialOffset = initialOffset
-
- def getBuffer = buffer
-
- def getErrorCode = errorCode
-
override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] {
val underlyingIterator = underlying.iterator
override def hasNext(): Boolean = {
@@ -67,13 +53,13 @@ class ByteBufferMessageSet(private val b
override def equals(other: Any): Boolean = {
other match {
case that: ByteBufferMessageSet =>
- (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+ (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset
case _ => false
}
}
def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
- override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode + initialOffset.hashCode
+ override def hashCode: Int = underlying.hashCode
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Mon Sep 10 20:47:23 2012
@@ -247,7 +247,7 @@ private[kafka] class Log( val dir: File,
logStats.recordAppendedMessages(numberOfMessages)
// truncate the message set's buffer upto validbytes, before appending it to the on-disk log
- val validByteBuffer = messages.getBuffer.duplicate()
+ val validByteBuffer = messages.buffer.duplicate()
val messageSetValidBytes = messages.validBytes
if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Mon Sep 10 20:47:23 2012
@@ -21,7 +21,7 @@ import kafka.utils.Logging
import java.nio.ByteBuffer
import java.nio.channels._
import kafka.utils.IteratorTemplate
-import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException, ErrorMapping}
+import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException}
/**
* A sequence of messages stored in a byte buffer
@@ -33,29 +33,19 @@ import kafka.common.{MessageSizeTooLarge
* Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
*
*/
-class ByteBufferMessageSet(private val buffer: ByteBuffer,
- private val initialOffset: Long = 0L,
- private val errorCode: Short = ErrorMapping.NoError) extends MessageSet with Logging {
+class ByteBufferMessageSet(val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet with Logging {
private var shallowValidByteCount = -1L
if(sizeInBytes > Int.MaxValue)
throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
def this(compressionCodec: CompressionCodec, messages: Message*) {
- this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
+ this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L)
}
def this(messages: Message*) {
this(NoCompressionCodec, messages: _*)
}
- def getInitialOffset = initialOffset
-
- def getBuffer = buffer
-
- def getErrorCode = errorCode
-
- def getSerialized(): ByteBuffer = buffer
-
def validBytes: Long = shallowValidBytes
private def shallowValidBytes: Long = {
@@ -96,7 +86,6 @@ class ByteBufferMessageSet(private val b
/** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
- ErrorMapping.maybeThrowException(errorCode)
new IteratorTemplate[MessageAndOffset] {
var topIter = buffer.slice()
var currValidBytes = initialOffset
@@ -192,12 +181,18 @@ class ByteBufferMessageSet(private val b
override def equals(other: Any): Boolean = {
other match {
case that: ByteBufferMessageSet =>
- (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+ (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset
case _ => false
}
}
override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
- override def hashCode: Int = 31 + (17 * errorCode) + buffer.hashCode + initialOffset.hashCode
+ override def hashCode: Int = {
+ var hash = 17
+ hash = hash * 31 + buffer.hashCode
+ hash = hash * 31 + initialOffset.hashCode
+ hash
+ }
+
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Mon Sep 10 20:47:23 2012
@@ -40,8 +40,6 @@ class FileMessageSet private[kafka](priv
private val setSize = new AtomicLong()
- def getSerialized(): ByteBuffer = throw new java.lang.UnsupportedOperationException()
-
if(mutable) {
if(limit < Long.MaxValue || offset > 0)
throw new KafkaException("Attempt to open a mutable message set with a view or offset, which is not allowed.")
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala Mon Sep 10 20:47:23 2012
@@ -110,10 +110,5 @@ abstract class MessageSet extends Iterab
if(!messageAndOffset.message.isValid)
throw new InvalidMessageException
}
-
- /**
- * Used to allow children to have serialization on implementation
- */
- def getSerialized(): ByteBuffer
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Sep 10 20:47:23 2012
@@ -317,15 +317,12 @@ class KafkaApis(val requestChannel: Requ
val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
val isFetchFromFollower = fetchRequest.replicaId != FetchRequest.NonFollowerId
- val partitionInfo = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower) match {
- case Left(err) =>
- BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
- BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
- new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
- case Right((messages, highWatermark)) =>
+ val partitionInfo =
+ try {
+ val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
- if(!isFetchFromFollower) {
+ if (!isFetchFromFollower) {
new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
} else {
debug("Leader %d for topic %s partition %d received fetch request from follower %d"
@@ -334,7 +331,15 @@ class KafkaApis(val requestChannel: Requ
.format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
}
- }
+ }
+ catch {
+ case e =>
+ BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
+ BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
+ error("error when processing request " + (topic, partition, offset, fetchSize), e)
+ new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
+ offset, -1L, MessageSet.Empty)
+ }
info.append(partitionInfo)
}
fetchedData.append(new TopicData(topic, info.toArray))
@@ -346,31 +351,23 @@ class KafkaApis(val requestChannel: Requ
* Read from a single topic/partition at the given offset upto maxSize bytes
*/
private def readMessageSet(topic: String, partition: Int, offset: Long,
- maxSize: Int, fromFollower: Boolean): Either[Short, (MessageSet, Long)] = {
- var response: Either[Short, (MessageSet, Long)] = null
- try {
- // check if the current broker is the leader for the partitions
- val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition)
- trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
- val actualSize = if (!fromFollower) {
- min(leader.highWatermark - offset, maxSize).toInt
- } else {
- maxSize
- }
- val messages = leader.log match {
- case Some(log) =>
- log.read(offset, actualSize)
- case None =>
- error("Leader for topic %s partition %d on broker %d does not have a local log".format(topic, partition, brokerId))
- MessageSet.Empty
- }
- response = Right(messages, leader.highWatermark)
- } catch {
- case e =>
- error("error when processing request " + (topic, partition, offset, maxSize), e)
- response = Left(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ maxSize: Int, fromFollower: Boolean): (MessageSet, Long) = {
+ // check if the current broker is the leader for the partitions
+ val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition)
+ trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
+ val actualSize = if (!fromFollower) {
+ min(leader.highWatermark - offset, maxSize).toInt
+ } else {
+ maxSize
+ }
+ val messages = leader.log match {
+ case Some(log) =>
+ log.read(offset, actualSize)
+ case None =>
+ error("Leader for topic %s partition %d on broker %d does not have a local log".format(topic, partition, brokerId))
+ MessageSet.Empty
}
- response
+ (messages, leader.highWatermark)
}
/**
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Mon Sep 10 20:47:23 2012
@@ -25,7 +25,7 @@ import org.scalatest.junit.JUnit3Suite
import scala.collection._
import kafka.producer.ProducerData
import kafka.utils.TestUtils
-import kafka.common.{KafkaException, OffsetOutOfRangeException}
+import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException}
/**
* End to end tests of the primitive apis against a local server
@@ -72,7 +72,7 @@ class LazyInitProducerTest extends JUnit
// send an invalid offset
try {
val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
- fetchedWithError.messageSet(topic, 0).iterator
+ fetchedWithError.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
fail("Expected an OffsetOutOfRangeException exception to be thrown")
} catch {
case e: OffsetOutOfRangeException =>
@@ -109,9 +109,9 @@ class LazyInitProducerTest extends JUnit
val request = builder.build()
val responses = consumer.fetch(request)
- for( (topic, offset) <- topicOffsets ) {
+ for(topicData <- responses.data) {
try {
- responses.messageSet(topic, offset).iterator
+ topicData.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected an OffsetOutOfRangeException exception to be thrown")
} catch {
case e: OffsetOutOfRangeException =>
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Mon Sep 10 20:47:23 2012
@@ -189,8 +189,7 @@ class PrimitiveApiTest extends JUnit3Sui
try {
val request = builder.build()
val response = consumer.fetch(request)
- for( (topic, partition) <- topics)
- response.messageSet(topic, partition).iterator
+ response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
fail("Expected exception when fetching message with invalid offset")
} catch {
case e: OffsetOutOfRangeException => "this is good"
@@ -206,9 +205,7 @@ class PrimitiveApiTest extends JUnit3Sui
try {
val request = builder.build()
val response = consumer.fetch(request)
- for( (topic, partition) <- topics) {
- response.messageSet(topic, -1).iterator
- }
+ response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
fail("Expected exception when fetching message with invalid partition")
} catch {
case e: UnknownTopicOrPartitionException => "this is good"
@@ -256,8 +253,7 @@ class PrimitiveApiTest extends JUnit3Sui
try {
val request = builder.build()
val response = consumer.fetch(request)
- for( (topic, partition) <- topics)
- response.messageSet(topic, partition).iterator
+ response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
fail("Expected exception when fetching message with invalid offset")
} catch {
case e: OffsetOutOfRangeException => "this is good"
@@ -273,8 +269,7 @@ class PrimitiveApiTest extends JUnit3Sui
try {
val request = builder.build()
val response = consumer.fetch(request)
- for( (topic, _) <- topics)
- response.messageSet(topic, -1).iterator
+ response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
fail("Expected exception when fetching message with invalid partition")
} catch {
case e: UnknownTopicOrPartitionException => "this is good"
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala Mon Sep 10 20:47:23 2012
@@ -17,7 +17,6 @@
package kafka.javaapi.message
-import java.nio._
import junit.framework.Assert._
import org.junit.Test
import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}
@@ -29,34 +28,10 @@ class ByteBufferMessageSetTest extends k
new ByteBufferMessageSet(compressed, getMessageList(messages: _*))
@Test
- def testValidBytes() {
+ def testEquals() {
val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = getMessageList(new Message("hello".getBytes()),
- new Message("there".getBytes())))
- val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
- buffer.put(messageList.getBuffer)
- buffer.putShort(4)
- val messageListPlus = new ByteBufferMessageSet(buffer)
- assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
- }
-
- @Test
- def testValidBytesWithCompression () {
- val messageList = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
- messages = getMessageList(new Message("hello".getBytes()),
new Message("there".getBytes())))
- val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
- buffer.put(messageList.getBuffer)
- buffer.putShort(4)
- val messageListPlus = new ByteBufferMessageSet(buffer, 0, 0)
- assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
- }
-
- @Test
- def testEquals() {
- val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = getMessageList(new Message("hello".getBytes()),
- new Message("there".getBytes())))
val moreMessages = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = getMessageList(new Message("hello".getBytes()),
new Message("there".getBytes())))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1383105&r1=1383104&r2=1383105&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Mon Sep 10 20:47:23 2012
@@ -33,7 +33,7 @@ class ByteBufferMessageSetTest extends B
// create a ByteBufferMessageSet that doesn't contain a full message
// iterating it should get an InvalidMessageSizeException
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes()))
- val buffer = messages.getSerialized().slice
+ val buffer = messages.buffer.slice
buffer.limit(10)
val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000)
try {
@@ -51,7 +51,7 @@ class ByteBufferMessageSetTest extends B
{
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
- buffer.put(messages.getSerialized())
+ buffer.put(messages.buffer)
buffer.putShort(4)
val messagesPlus = new ByteBufferMessageSet(buffer)
assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
@@ -65,6 +65,18 @@ class ByteBufferMessageSetTest extends B
}
@Test
+ def testValidBytesWithCompression() {
+ {
+ val messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+ val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
+ buffer.put(messages.buffer)
+ buffer.putShort(4)
+ val messagesPlus = new ByteBufferMessageSet(buffer)
+ assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
+ }
+ }
+
+ @Test
def testEquals() {
var messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
var moreMessages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
@@ -93,7 +105,7 @@ class ByteBufferMessageSetTest extends B
//make sure ByteBufferMessageSet is re-iterable.
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
//make sure the last offset after iteration is correct
- assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit)
+ assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.buffer.limit)
//make sure shallow iterator is the same as deep iterator
TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator),
@@ -107,7 +119,7 @@ class ByteBufferMessageSetTest extends B
//make sure ByteBufferMessageSet is re-iterable.
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
//make sure the last offset after iteration is correct
- assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit)
+ assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.buffer.limit)
verifyShallowIterator(messageSet)
}
@@ -117,16 +129,16 @@ class ByteBufferMessageSetTest extends B
val emptyMessageList : List[Message] = Nil
val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*)
val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
- val buffer = ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + regularMessgeSet.getSerialized().limit)
- buffer.put(emptyMessageSet.getSerialized())
- buffer.put(regularMessgeSet.getSerialized())
+ val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit + regularMessgeSet.buffer.limit)
+ buffer.put(emptyMessageSet.buffer)
+ buffer.put(regularMessgeSet.buffer)
buffer.rewind
val mixedMessageSet = new ByteBufferMessageSet(buffer)
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure ByteBufferMessageSet is re-iterable.
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure the last offset after iteration is correct
- assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit)
+ assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.buffer.limit)
//make sure shallow iterator is the same as deep iterator
TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator),
@@ -138,16 +150,16 @@ class ByteBufferMessageSetTest extends B
val emptyMessageList : List[Message] = Nil
val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*)
val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
- val buffer = ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + regularMessgeSet.getSerialized().limit)
- buffer.put(emptyMessageSet.getSerialized())
- buffer.put(regularMessgeSet.getSerialized())
+ val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit + regularMessgeSet.buffer.limit)
+ buffer.put(emptyMessageSet.buffer)
+ buffer.put(regularMessgeSet.buffer)
buffer.rewind
val mixedMessageSet = new ByteBufferMessageSet(buffer)
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure ByteBufferMessageSet is re-iterable.
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure the last offset after iteration is correct
- assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit)
+ assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.buffer.limit)
verifyShallowIterator(mixedMessageSet)
}