You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2012/09/17 22:16:00 UTC
svn commit: r1386806 [1/2] - in /incubator/kafka/branches/0.8:
contrib/hadoop-consumer/src/main/java/kafka/etl/impl/
core/src/main/scala/kafka/api/ core/src/main/scala/kafka/common/
core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/...
Author: jjkoshy
Date: Mon Sep 17 20:15:59 2012
New Revision: 1386806
URL: http://svn.apache.org/viewvc?rev=1386806&view=rev
Log:
KAFKA-391 Refactor fetch/producer requests to use maps instead of several arrays; patched by Joel Koshy; reviewed by Jun Rao.
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchRequest.scala
Removed:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/FetchRequestFormatException.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/
Modified:
incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java Mon Sep 17 20:15:59 2012
@@ -26,10 +26,10 @@ import java.util.Random;
import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLRequest;
import kafka.etl.Props;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.javaapi.producer.SyncProducer;
+import kafka.javaapi.producer.Producer;
+import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
-import kafka.producer.SyncProducerConfig;
+import kafka.producer.ProducerConfig;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -47,7 +47,7 @@ public class DataGenerator {
System.currentTimeMillis());
protected Props _props;
- protected SyncProducer _producer = null;
+ protected Producer _producer = null;
protected URI _uri = null;
protected String _topic;
protected int _count;
@@ -70,12 +70,12 @@ public class DataGenerator {
System.out.println("server uri:" + _uri.toString());
Properties producerProps = new Properties();
- producerProps.put("host", _uri.getHost());
- producerProps.put("port", String.valueOf(_uri.getPort()));
+ producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE));
producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
- _producer = new SyncProducer(new SyncProducerConfig(producerProps));
+
+ _producer = new Producer(new ProducerConfig(producerProps));
}
@@ -91,7 +91,8 @@ public class DataGenerator {
}
// send events
System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
- _producer.send(_topic, new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, list));
+ ProducerData<Integer, Message> pd = new ProducerData<Integer, Message>(_topic, null, list);
+ _producer.send(pd);
// close the producer
_producer.close();
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Mon Sep 17 20:15:59 2012
@@ -18,60 +18,13 @@
package kafka.api
import java.nio.ByteBuffer
-import kafka.utils.Utils
-import scala.collection.mutable.{HashMap, Buffer, ListBuffer}
-import kafka.common.{KafkaException, FetchRequestFormatException}
-
-object OffsetDetail {
-
- def readFrom(buffer: ByteBuffer): OffsetDetail = {
- val topic = Utils.readShortString(buffer, "UTF-8")
-
- val partitionsCount = buffer.getInt
- val partitions = new Array[Int](partitionsCount)
- for (i <- 0 until partitions.length)
- partitions(i) = buffer.getInt
-
- val offsetsCount = buffer.getInt
- val offsets = new Array[Long](offsetsCount)
- for (i <- 0 until offsets.length)
- offsets(i) = buffer.getLong
-
- val fetchesCount = buffer.getInt
- val fetchSizes = new Array[Int](fetchesCount)
- for (i <- 0 until fetchSizes.length)
- fetchSizes(i) = buffer.getInt
+import kafka.utils.{nonthreadsafe, Utils}
+import scala.collection.immutable.Map
+import kafka.common.TopicAndPartition
- new OffsetDetail(topic, partitions, offsets, fetchSizes)
- }
-
-}
-
-case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long], fetchSizes: Seq[Int]) {
-
- def writeTo(buffer: ByteBuffer) {
- Utils.writeShortString(buffer, topic, "UTF-8")
-
- if(partitions.size > Int.MaxValue || offsets.size > Int.MaxValue || fetchSizes.size > Int.MaxValue)
- throw new KafkaException("Number of fetches in FetchRequest exceeds " + Int.MaxValue + ".")
-
- buffer.putInt(partitions.length)
- partitions.foreach(buffer.putInt(_))
- buffer.putInt(offsets.length)
- offsets.foreach(buffer.putLong(_))
-
- buffer.putInt(fetchSizes.length)
- fetchSizes.foreach(buffer.putInt(_))
- }
+case class PartitionFetchInfo(offset: Long, fetchSize: Int)
- def sizeInBytes(): Int = {
- 2 + topic.length() + // topic string
- partitions.foldLeft(4)((s, _) => s + 4) + // each request partition (int)
- offsets.foldLeft(4)((s, _) => s + 8) + // each request offset (long)
- fetchSizes.foldLeft(4)((s,_) => s + 4) // each request fetch size
- }
-}
object FetchRequest {
val CurrentVersion = 1.shortValue()
@@ -85,18 +38,23 @@ object FetchRequest {
def readFrom(buffer: ByteBuffer): FetchRequest = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
- val clientId = Utils.readShortString(buffer, "UTF-8")
+ val clientId = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val replicaId = buffer.getInt
val maxWait = buffer.getInt
val minBytes = buffer.getInt
- val offsetsCount = buffer.getInt
- val offsetInfo = new Array[OffsetDetail](offsetsCount)
- for(i <- 0 until offsetInfo.length)
- offsetInfo(i) = OffsetDetail.readFrom(buffer)
-
- new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetInfo)
+ val topicCount = buffer.getInt
+ val pairs = (1 to topicCount).flatMap(_ => {
+ val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
+ val partitionCount = buffer.getInt
+ (1 to partitionCount).map(_ => {
+ val partitionId = buffer.getInt
+ val offset = buffer.getLong
+ val fetchSize = buffer.getInt
+ (TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, fetchSize))
+ })
+ })
+ FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, Map(pairs:_*))
}
-
}
case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
@@ -105,50 +63,63 @@ case class FetchRequest(versionId: Short
replicaId: Int = FetchRequest.DefaultReplicaId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
- offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
+ requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
+ extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
- // ensure that a topic "X" appears in at most one OffsetDetail
- def validate() {
- if(offsetInfo == null)
- throw new FetchRequestFormatException("FetchRequest has null offsetInfo")
-
- // We don't want to get fancy with groupBy's and filter's since we just want the first occurrence
- var topics = Set[String]()
- val iter = offsetInfo.iterator
- while(iter.hasNext) {
- val offsetData = iter.next()
- val topic = offsetData.topic
- if(topics.contains(topic))
- throw new FetchRequestFormatException("FetchRequest has multiple OffsetDetails for topic: " + topic)
- else
- topics += topic
- }
- }
+ /**
+ * Partitions the request info into a map of maps (one for each topic).
+ */
+ lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
def writeTo(buffer: ByteBuffer) {
- // validate first
- validate()
-
buffer.putShort(versionId)
buffer.putInt(correlationId)
- Utils.writeShortString(buffer, clientId, "UTF-8")
+ Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset)
buffer.putInt(replicaId)
buffer.putInt(maxWait)
buffer.putInt(minBytes)
- buffer.putInt(offsetInfo.size)
- for(topicDetail <- offsetInfo) {
- topicDetail.writeTo(buffer)
+ buffer.putInt(requestInfoGroupedByTopic.size) // topic count
+ requestInfoGroupedByTopic.foreach {
+ case (topic, partitionFetchInfos) =>
+ Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset)
+ buffer.putInt(partitionFetchInfos.size) // partition count
+ partitionFetchInfos.foreach {
+ case (TopicAndPartition(_, partition), PartitionFetchInfo(offset, fetchSize)) =>
+ buffer.putInt(partition)
+ buffer.putLong(offset)
+ buffer.putInt(fetchSize)
+ }
}
}
- def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
+ def sizeInBytes: Int = {
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) +
+ 4 + /* replicaId */
+ 4 + /* maxWait */
+ 4 + /* minBytes */
+ 4 + /* topic count */
+ requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
+ val (topic, partitionFetchInfos) = currTopic
+ foldedTopics +
+ Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
+ 4 + /* partition count */
+ partitionFetchInfos.size * (
+ 4 + /* partition id */
+ 8 + /* offset */
+ 4 /* fetch size */
+ )
+ })
+ }
- def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size)
+ def isFromFollower = replicaId != FetchRequest.NonFollowerId
- def isFromFollower(): Boolean = replicaId != FetchRequest.NonFollowerId
+ def numPartitions = requestInfo.size
}
+@nonthreadsafe
class FetchRequestBuilder() {
private var correlationId = FetchRequest.DefaultCorrelationId
private val versionId = FetchRequest.CurrentVersion
@@ -156,13 +127,10 @@ class FetchRequestBuilder() {
private var replicaId = FetchRequest.DefaultReplicaId
private var maxWait = FetchRequest.DefaultMaxWait
private var minBytes = FetchRequest.DefaultMinBytes
- private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]]
+ private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo]
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
- val topicData = requestMap.getOrElseUpdate(topic, (ListBuffer[Int](), ListBuffer[Long](), ListBuffer[Int]()))
- topicData._1.append(partition)
- topicData._2.append(offset)
- topicData._3.append(fetchSize)
+ requestMap.put(TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize))
this
}
@@ -191,10 +159,5 @@ class FetchRequestBuilder() {
this
}
- def build() = {
- val offsetDetails = requestMap.map{ topicData =>
- new OffsetDetail(topicData._1, topicData._2._1.toArray, topicData._2._2.toArray, topicData._2._3.toArray)
- }
- new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetDetails.toArray[OffsetDetail])
- }
+ def build() = FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Mon Sep 17 20:15:59 2012
@@ -19,27 +19,35 @@ package kafka.api
import java.nio.ByteBuffer
import java.nio.channels.GatheringByteChannel
-import kafka.common.ErrorMapping
+import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.message.{MessageSet, ByteBufferMessageSet}
import kafka.network.{MultiSend, Send}
import kafka.utils.Utils
object PartitionData {
def readFrom(buffer: ByteBuffer): PartitionData = {
- val error = buffer.getShort
val partition = buffer.getInt
+ val error = buffer.getShort
val initialOffset = buffer.getLong
- val hw = buffer.getLong()
+ val hw = buffer.getLong
val messageSetSize = buffer.getInt
val messageSetBuffer = buffer.slice()
messageSetBuffer.limit(messageSetSize)
buffer.position(buffer.position + messageSetSize)
new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset))
}
+
+ val headerSize =
+ 4 + /* partition */
+ 2 + /* error code */
+ 8 + /* initialOffset */
+ 8 + /* high watermark */
+ 4 /* messageSetSize */
}
case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) {
- val sizeInBytes = 4 + 2 + 8 + 4 + messages.sizeInBytes.intValue() + 8
+
+ val sizeInBytes = PartitionData.headerSize + messages.sizeInBytes.intValue()
def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages)
}
@@ -50,17 +58,17 @@ class PartitionDataSend(val partitionDat
private val messageSize = partitionData.messages.sizeInBytes
private var messagesSentSize = 0L
- private val buffer = ByteBuffer.allocate(26)
- buffer.putShort(partitionData.error)
+ private val buffer = ByteBuffer.allocate(PartitionData.headerSize)
buffer.putInt(partitionData.partition)
+ buffer.putShort(partitionData.error)
buffer.putLong(partitionData.initialOffset)
buffer.putLong(partitionData.hw)
buffer.putInt(partitionData.messages.sizeInBytes.intValue())
buffer.rewind()
- def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
+ override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
- def writeTo(channel: GatheringByteChannel): Int = {
+ override def writeTo(channel: GatheringByteChannel): Int = {
var written = 0
if(buffer.hasRemaining)
written += channel.write(buffer)
@@ -75,63 +83,43 @@ class PartitionDataSend(val partitionDat
object TopicData {
def readFrom(buffer: ByteBuffer): TopicData = {
- val topic = Utils.readShortString(buffer, "UTF-8")
+ val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val partitionCount = buffer.getInt
- val partitions = new Array[PartitionData](partitionCount)
- for(i <- 0 until partitionCount)
- partitions(i) = PartitionData.readFrom(buffer)
- new TopicData(topic, partitions.sortBy(_.partition))
- }
-
- def findPartition(data: Array[PartitionData], partition: Int): Option[PartitionData] = {
- if(data == null || data.size == 0)
- return None
-
- var (low, high) = (0, data.size-1)
- while(low <= high) {
- val mid = (low + high) / 2
- val found = data(mid)
- if(found.partition == partition)
- return Some(found)
- else if(partition < found.partition)
- high = mid - 1
- else
- low = mid + 1
- }
- None
+ val topicPartitionDataPairs = (1 to partitionCount).map(_ => {
+ val partitionData = PartitionData.readFrom(buffer)
+ (TopicAndPartition(topic, partitionData.partition), partitionData)
+ })
+ TopicData(topic, Map(topicPartitionDataPairs:_*))
}
+
+ def headerSize(topic: String) =
+ Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
+ 4 /* partition count */
}
-case class TopicData(topic: String, partitionDataArray: Array[PartitionData]) {
- val sizeInBytes = 2 + topic.length + partitionDataArray.foldLeft(4)(_ + _.sizeInBytes)
+case class TopicData(topic: String, partitionData: Map[TopicAndPartition, PartitionData]) {
+ val sizeInBytes =
+ TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + _.sizeInBytes)
- // need to override equals due to brokern java-arrays equals functionality
- override def equals(other: Any): Boolean = {
- other match {
- case that: TopicData =>
- ( topic == that.topic &&
- partitionDataArray.toSeq == that.partitionDataArray.toSeq )
- case _ => false
- }
- }
+ val headerSize = TopicData.headerSize(topic)
}
class TopicDataSend(val topicData: TopicData) extends Send {
- val size = topicData.sizeInBytes
+ private val size = topicData.sizeInBytes
- var sent = 0
+ private var sent = 0
- private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4)
- Utils.writeShortString(buffer, topicData.topic, "UTF-8")
- buffer.putInt(topicData.partitionDataArray.length)
+ override def complete = sent >= size
+
+ private val buffer = ByteBuffer.allocate(topicData.headerSize)
+ Utils.writeShortString(buffer, topicData.topic, RequestOrResponse.DefaultCharset)
+ buffer.putInt(topicData.partitionData.size)
buffer.rewind()
- val sends = new MultiSend(topicData.partitionDataArray.map(new PartitionDataSend(_)).toList) {
- val expectedBytesToWrite = topicData.partitionDataArray.foldLeft(0)(_ + _.sizeInBytes)
+ val sends = new MultiSend(topicData.partitionData.toList.map(d => new PartitionDataSend(d._2))) {
+ val expectedBytesToWrite = topicData.sizeInBytes - topicData.headerSize
}
- def complete = sent >= size
-
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete()
var written = 0
@@ -146,68 +134,87 @@ class TopicDataSend(val topicData: Topic
}
+object FetchResponse {
+ val headerSize =
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ 4 /* topic count */
-object FetchResponse {
def readFrom(buffer: ByteBuffer): FetchResponse = {
val versionId = buffer.getShort
- val errorCode = buffer.getShort
val correlationId = buffer.getInt
- val dataCount = buffer.getInt
- val data = new Array[TopicData](dataCount)
- for(i <- 0 until data.length)
- data(i) = TopicData.readFrom(buffer)
- new FetchResponse(versionId, correlationId, data, errorCode)
+ val topicCount = buffer.getInt
+ val pairs = (1 to topicCount).flatMap(_ => {
+ val topicData = TopicData.readFrom(buffer)
+ topicData.partitionData.values.map(
+ partitionData => (TopicAndPartition(topicData.topic, partitionData.partition), partitionData)
+ )
+ })
+ FetchResponse(versionId, correlationId, Map(pairs:_*))
}
}
case class FetchResponse(versionId: Short,
correlationId: Int,
- data: Array[TopicData],
- errorCode: Short = ErrorMapping.NoError) {
+ data: Map[TopicAndPartition, PartitionData]) {
- val sizeInBytes = 2 + 4 + 2 + data.foldLeft(4)(_ + _.sizeInBytes)
+ /**
+ * Partitions the data into a map of maps (one for each topic).
+ */
+ lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
- lazy val topicMap = data.groupBy(_.topic).mapValues(_.head)
+ val sizeInBytes =
+ FetchResponse.headerSize +
+ dataGroupedByTopic.foldLeft(0) ((folded, curr) => {
+ val topicData = TopicData(curr._1, curr._2)
+ folded +
+ topicData.sizeInBytes
+ })
- def messageSet(topic: String, partition: Int): ByteBufferMessageSet = {
- val messageSet = topicMap.get(topic) match {
- case Some(topicData) =>
- TopicData.findPartition(topicData.partitionDataArray, partition).map(_.messages).getOrElse(MessageSet.Empty)
- case None =>
- MessageSet.Empty
+ private def partitionDataFor(topic: String, partition: Int): PartitionData = {
+ val topicAndPartition = TopicAndPartition(topic, partition)
+ data.get(topicAndPartition) match {
+ case Some(partitionData) => partitionData
+ case _ =>
+ throw new IllegalArgumentException(
+ "No partition %s in fetch response %s".format(topicAndPartition, this.toString))
}
- messageSet.asInstanceOf[ByteBufferMessageSet]
}
- def highWatermark(topic: String, partition: Int): Long = {
- topicMap.get(topic) match {
- case Some(topicData) =>
- TopicData.findPartition(topicData.partitionDataArray, partition).map(_.hw).getOrElse(-1L)
- case None => -1L
- }
- }
+ def messageSet(topic: String, partition: Int): ByteBufferMessageSet =
+ partitionDataFor(topic, partition).messages.asInstanceOf[ByteBufferMessageSet]
+
+ def highWatermark(topic: String, partition: Int) = partitionDataFor(topic, partition).hw
+
+ def hasError = data.values.exists(_.error != ErrorMapping.NoError)
+
+ def errorCode(topic: String, partition: Int) = partitionDataFor(topic, partition).error
}
class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
private val size = fetchResponse.sizeInBytes
+
private var sent = 0
-
- private val buffer = ByteBuffer.allocate(16)
+
+ private val sendSize = 4 /* for size */ + size
+
+ override def complete = sent >= sendSize
+
+ private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize)
buffer.putInt(size)
buffer.putShort(fetchResponse.versionId)
- buffer.putShort(fetchResponse.errorCode)
buffer.putInt(fetchResponse.correlationId)
- buffer.putInt(fetchResponse.data.length)
+ buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count
buffer.rewind()
-
- val sends = new MultiSend(fetchResponse.data.map(new TopicDataSend(_)).toList) {
- val expectedBytesToWrite = fetchResponse.data.foldLeft(0)(_ + _.sizeInBytes)
- }
- def complete = sent >= sendSize
+ val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map {
+ case(topic, data) => new TopicDataSend(TopicData(topic, data))
+ }) {
+ val expectedBytesToWrite = fetchResponse.sizeInBytes - FetchResponse.headerSize
+ }
def writeTo(channel: GatheringByteChannel):Int = {
expectIncomplete()
@@ -220,6 +227,5 @@ class FetchResponseSend(val fetchRespons
sent += written
written
}
-
- def sendSize = 4 + fetchResponse.sizeInBytes
}
+
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Mon Sep 17 20:15:59 2012
@@ -20,96 +20,105 @@ package kafka.api
import java.nio._
import kafka.message._
import kafka.utils._
+import scala.collection.Map
+import kafka.common.TopicAndPartition
object ProducerRequest {
val CurrentVersion: Short = 0
-
+
def readFrom(buffer: ByteBuffer): ProducerRequest = {
val versionId: Short = buffer.getShort
val correlationId: Int = buffer.getInt
- val clientId: String = Utils.readShortString(buffer, "UTF-8")
+ val clientId: String = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val requiredAcks: Short = buffer.getShort
val ackTimeoutMs: Int = buffer.getInt
//build the topic structure
val topicCount = buffer.getInt
- val data = new Array[TopicData](topicCount)
- for(i <- 0 until topicCount) {
- val topic = Utils.readShortString(buffer, "UTF-8")
-
+ val partitionDataPairs = (1 to topicCount).flatMap(_ => {
+ // process topic
+ val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val partitionCount = buffer.getInt
- //build the partition structure within this topic
- val partitionData = new Array[PartitionData](partitionCount)
- for (j <- 0 until partitionCount) {
+ (1 to partitionCount).map(_ => {
val partition = buffer.getInt
val messageSetSize = buffer.getInt
val messageSetBuffer = new Array[Byte](messageSetSize)
buffer.get(messageSetBuffer,0,messageSetSize)
- partitionData(j) = new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
- }
- data(i) = new TopicData(topic,partitionData)
- }
- new ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
+ (TopicAndPartition(topic, partition),
+ new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))))
+ })
+ })
+
+ ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, Map(partitionDataPairs:_*))
}
}
-case class ProducerRequest( versionId: Short,
+case class ProducerRequest( versionId: Short = ProducerRequest.CurrentVersion,
correlationId: Int,
clientId: String,
requiredAcks: Short,
ackTimeoutMs: Int,
- data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
+ data: Map[TopicAndPartition, PartitionData])
+ extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
- def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) =
+ /**
+ * Partitions the data into a map of maps (one for each topic).
+ */
+ private lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
+
+ def this(correlationId: Int,
+ clientId: String,
+ requiredAcks: Short,
+ ackTimeoutMs: Int,
+ data: Map[TopicAndPartition, PartitionData]) =
this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId)
- Utils.writeShortString(buffer, clientId, "UTF-8")
+ Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset)
buffer.putShort(requiredAcks)
buffer.putInt(ackTimeoutMs)
+
//save the topic structure
- buffer.putInt(data.size) //the number of topics
- for(topicData <- data) {
- Utils.writeShortString(buffer, topicData.topic, "UTF-8") //write the topic
- buffer.putInt(topicData.partitionDataArray.size) //the number of partitions
- for(partitionData <- topicData.partitionDataArray) {
- buffer.putInt(partitionData.partition)
- buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit)
- buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer)
- partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind
- }
+ buffer.putInt(dataGroupedByTopic.size) //the number of topics
+ dataGroupedByTopic.foreach {
+ case (topic, topicAndPartitionData) =>
+ Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) //write the topic
+ buffer.putInt(topicAndPartitionData.size) //the number of partitions
+ topicAndPartitionData.foreach(partitionAndData => {
+ val partitionData = partitionAndData._2
+ buffer.putInt(partitionData.partition)
+ buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit)
+ buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer)
+ partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind
+ })
}
}
- def sizeInBytes(): Int = {
- var size = 0
- //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size
- size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4
- for(topicData <- data) {
- size += 2 + topicData.topic.length + 4
- for(partitionData <- topicData.partitionDataArray) {
- size += 4 + 4 + partitionData.messages.sizeInBytes.asInstanceOf[Int]
+ def sizeInBytes: Int = {
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + /* client id */
+ 2 + /* requiredAcks */
+ 4 + /* ackTimeoutMs */
+ 4 + /* number of topics */
+ dataGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
+ foldedTopics +
+ Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
+ 4 + /* the number of partitions */
+ {
+ currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => {
+ foldedPartitions +
+ 4 + /* partition id */
+ 4 + /* byte-length of serialized messages */
+ currPartition._2.messages.sizeInBytes.toInt
+ })
}
- }
- size
- }
-
- // need to override case-class equals due to broken java-array equals()
- override def equals(other: Any): Boolean = {
- other match {
- case that: ProducerRequest =>
- ( correlationId == that.correlationId &&
- clientId == that.clientId &&
- requiredAcks == that.requiredAcks &&
- ackTimeoutMs == that.ackTimeoutMs &&
- data.toSeq == that.data.toSeq )
- case _ => false
- }
+ })
}
- def topicPartitionCount = data.foldLeft(0)(_ + _.partitionDataArray.length)
+ def numPartitions = data.size
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala Mon Sep 17 20:15:59 2012
@@ -18,57 +18,81 @@
package kafka.api
import java.nio.ByteBuffer
-import kafka.common.ErrorMapping
+import kafka.utils.Utils
+import scala.collection.Map
+import kafka.common.{TopicAndPartition, ErrorMapping}
object ProducerResponse {
def readFrom(buffer: ByteBuffer): ProducerResponse = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
- val errorCode = buffer.getShort
- val errorsSize = buffer.getInt
- val errors = new Array[Short](errorsSize)
- for( i <- 0 until errorsSize) {
- errors(i) = buffer.getShort
- }
- val offsetsSize = buffer.getInt
- val offsets = new Array[Long](offsetsSize)
- for( i <- 0 until offsetsSize) {
- offsets(i) = buffer.getLong
- }
- new ProducerResponse(versionId, correlationId, errors, offsets, errorCode)
+ val topicCount = buffer.getInt
+ val statusPairs = (1 to topicCount).flatMap(_ => {
+ val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
+ val partitionCount = buffer.getInt
+ (1 to partitionCount).map(_ => {
+ val partition = buffer.getInt
+ val error = buffer.getShort
+ val offset = buffer.getLong
+ (TopicAndPartition(topic, partition), ProducerResponseStatus(error, offset))
+ })
+ })
+
+ ProducerResponse(versionId, correlationId, Map(statusPairs:_*))
}
}
-case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short],
- offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
- val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length)
+case class ProducerResponseStatus(error: Short, nextOffset: Long)
+
+
+case class ProducerResponse(versionId: Short,
+ correlationId: Int,
+ status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse {
+
+ /**
+ * Partitions the status map into a map of maps (one for each topic).
+ */
+ private lazy val statusGroupedByTopic = status.groupBy(_._1.topic)
+
+ def hasError = status.values.exists(_.error != ErrorMapping.NoError)
+
+ val sizeInBytes = {
+ val groupedStatus = statusGroupedByTopic
+ 2 + /* version id */
+ 4 + /* correlation id */
+ 4 + /* topic count */
+ groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => {
+ foldedTopics +
+ Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
+ 4 + /* partition count for this topic */
+ currTopic._2.foldLeft (0) ((foldedPartitions, currPartition) => {
+ foldedPartitions +
+ 4 + /* partition id */
+ 2 + /* error code */
+ 8 /* offset */
+ })
+ })
+ }
def writeTo(buffer: ByteBuffer) {
- /* version id */
+ val groupedStatus = statusGroupedByTopic
+
buffer.putShort(versionId)
- /* correlation id */
buffer.putInt(correlationId)
- /* error code */
- buffer.putShort(errorCode)
- /* errors */
- buffer.putInt(errors.length)
- errors.foreach(buffer.putShort(_))
- /* offsets */
- buffer.putInt(offsets.length)
- offsets.foreach(buffer.putLong(_))
- }
+ buffer.putInt(groupedStatus.size) // topic count
- // need to override case-class equals due to broken java-array equals()
- override def equals(other: Any): Boolean = {
- other match {
- case that: ProducerResponse =>
- ( correlationId == that.correlationId &&
- versionId == that.versionId &&
- errorCode == that.errorCode &&
- errors.toSeq == that.errors.toSeq &&
- offsets.toSeq == that.offsets.toSeq)
- case _ => false
- }
+ groupedStatus.foreach(topicStatus => {
+ val (topic, errorsAndOffsets) = topicStatus
+ Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset)
+ buffer.putInt(errorsAndOffsets.size) // partition count
+ errorsAndOffsets.foreach {
+ case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset))) =>
+ buffer.putInt(partition)
+ buffer.putShort(error)
+ buffer.putLong(nextOffset)
+ }
+ })
}
-}
\ No newline at end of file
+}
+
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala Mon Sep 17 20:15:59 2012
@@ -19,6 +19,12 @@ package kafka.api
import java.nio._
+
+object RequestOrResponse {
+ val DefaultCharset = "UTF-8"
+}
+
+
private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) {
def sizeInBytes: Int
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Mon Sep 17 20:15:59 2012
@@ -34,13 +34,12 @@ object ErrorMapping {
val InvalidMessageCode : Short = 2
val UnknownTopicOrPartitionCode : Short = 3
val InvalidFetchSizeCode : Short = 4
- val InvalidFetchRequestFormatCode : Short = 5
- val LeaderNotAvailableCode : Short = 6
- val NotLeaderForPartitionCode : Short = 7
- val RequestTimedOutCode: Short = 8
- val BrokerNotAvailableCode: Short = 9
- val ReplicaNotAvailableCode: Short = 10
- val MessageSizeTooLargeCode: Short = 11
+ val LeaderNotAvailableCode : Short = 5
+ val NotLeaderForPartitionCode : Short = 6
+ val RequestTimedOutCode: Short = 7
+ val BrokerNotAvailableCode: Short = 8
+ val ReplicaNotAvailableCode: Short = 9
+ val MessageSizeTooLargeCode: Short = 10
private val exceptionToCode =
Map[Class[Throwable], Short](
@@ -48,7 +47,6 @@ object ErrorMapping {
classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
classOf[UnknownTopicOrPartitionException].asInstanceOf[Class[Throwable]] -> UnknownTopicOrPartitionCode,
classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
- classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala?rev=1386806&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/TopicAndPartition.scala Mon Sep 17 20:15:59 2012
@@ -0,0 +1,29 @@
+package kafka.common
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Convenience case class since (topic, partition) pairs are ubiquitous.
+ */
+case class TopicAndPartition(topic: String, partition: Int) {
+
+ def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
+
+ def asTuple = (topic, partition)
+}
+
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Mon Sep 17 20:15:59 2012
@@ -25,6 +25,7 @@ import scala.collection.mutable
import java.util.concurrent.locks.ReentrantLock
import kafka.utils.ZkUtils._
import kafka.utils.{ShutdownableThread, SystemTime}
+import kafka.common.TopicAndPartition
/**
@@ -38,7 +39,7 @@ class ConsumerFetcherManager(private val
extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) {
private var partitionMap: immutable.Map[(String, Int), PartitionTopicInfo] = null
private var cluster: Cluster = null
- private val noLeaderPartitionSet = new mutable.HashSet[(String, Int)]
+ private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
private val lock = new ReentrantLock
private val cond = lock.newCondition()
private val leaderFinderThread = new ShutdownableThread(consumerIdString + "-leader-finder-thread"){
@@ -48,21 +49,22 @@ class ConsumerFetcherManager(private val
try {
if (noLeaderPartitionSet.isEmpty)
cond.await()
- for ((topic, partitionId) <- noLeaderPartitionSet) {
- // find the leader for this partition
- getLeaderForPartition(zkClient, topic, partitionId) match {
- case Some(leaderId) =>
- cluster.getBroker(leaderId) match {
- case Some(broker) =>
- val pti = partitionMap((topic, partitionId))
- addFetcher(topic, partitionId, pti.getFetchOffset(), broker)
- noLeaderPartitionSet.remove((topic, partitionId))
- case None =>
- error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started"
- .format(leaderId, topic, partitionId))
- }
- case None => // let it go since we will keep retrying
- }
+ noLeaderPartitionSet.foreach {
+ case(TopicAndPartition(topic, partitionId)) =>
+ // find the leader for this partition
+ getLeaderForPartition(zkClient, topic, partitionId) match {
+ case Some(leaderId) =>
+ cluster.getBroker(leaderId) match {
+ case Some(broker) =>
+ val pti = partitionMap((topic, partitionId))
+ addFetcher(topic, partitionId, pti.getFetchOffset(), broker)
+ noLeaderPartitionSet.remove(TopicAndPartition(topic, partitionId))
+ case None =>
+ error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started"
+ .format(leaderId, topic, partitionId))
+ }
+ case None => // let it go since we will keep retrying
+ }
}
} finally {
lock.unlock()
@@ -84,7 +86,7 @@ class ConsumerFetcherManager(private val
try {
partitionMap = topicInfos.map(tpi => ((tpi.topic, tpi.partitionId), tpi)).toMap
this.cluster = cluster
- noLeaderPartitionSet ++= topicInfos.map(tpi => (tpi.topic, tpi.partitionId))
+ noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
cond.signalAll()
} finally {
lock.unlock()
@@ -117,7 +119,7 @@ class ConsumerFetcherManager(private val
pti
}
- def addPartitionsWithError(partitionList: Iterable[(String, Int)]) {
+ def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {
debug("adding partitions with error %s".format(partitionList))
lock.lock()
try {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala Mon Sep 17 20:15:59 2012
@@ -21,6 +21,8 @@ import kafka.cluster.Broker
import kafka.server.AbstractFetcherThread
import kafka.message.ByteBufferMessageSet
import kafka.api.{FetchRequest, OffsetRequest, PartitionData}
+import kafka.common.TopicAndPartition
+
class ConsumerFetcherThread(name: String,
val config: ConsumerConfig,
@@ -57,7 +59,7 @@ class ConsumerFetcherThread(name: String
}
// any logic for partitions whose leader has changed
- def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) {
+ def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
consumerFetcherManager.addPartitionsWithError(partitions)
}
}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchRequest.scala?rev=1386806&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchRequest.scala Mon Sep 17 20:15:59 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import scala.collection.JavaConversions
+import kafka.api.PartitionFetchInfo
+import java.nio.ByteBuffer
+import kafka.common.TopicAndPartition
+
+
+class FetchRequest(correlationId: Int,
+ clientId: String,
+ replicaId: Int,
+ maxWait: Int,
+ minBytes: Int,
+ requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
+
+ val underlying = {
+ val scalaMap = JavaConversions.asMap(requestInfo).toMap
+ kafka.api.FetchRequest(
+ correlationId = correlationId,
+ clientId = clientId,
+ replicaId = replicaId,
+ maxWait = maxWait,
+ minBytes = minBytes,
+ requestInfo = scalaMap
+ )
+ }
+
+ def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
+
+ def sizeInBytes = underlying.sizeInBytes
+
+ override def toString = underlying.toString
+
+ override def equals(other: Any) = canEqual(other) && {
+ val otherFetchRequest = other.asInstanceOf[kafka.javaapi.FetchRequest]
+ this.underlying.equals(otherFetchRequest.underlying)
+ }
+
+ def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchRequest]
+
+ override def hashCode = underlying.hashCode
+
+}
+
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala Mon Sep 17 20:15:59 2012
@@ -17,17 +17,33 @@
package kafka.javaapi
-import kafka.api.TopicData
+import kafka.api.PartitionData
+import kafka.common.TopicAndPartition
class FetchResponse( val versionId: Short,
val correlationId: Int,
- private val data: Array[TopicData] ) {
+ private val data: Map[TopicAndPartition, PartitionData] ) {
- private val underlying = new kafka.api.FetchResponse(versionId, correlationId, data)
+ private val underlying = kafka.api.FetchResponse(versionId, correlationId, data)
def messageSet(topic: String, partition: Int): kafka.javaapi.message.ByteBufferMessageSet = {
import Implicits._
underlying.messageSet(topic, partition)
}
+
+ def highWatermark(topic: String, partition: Int) = underlying.highWatermark(topic, partition)
+
+ def hasError = underlying.hasError
+
+ def errorCode(topic: String, partition: Int) = underlying.errorCode(topic, partition)
+
+ override def equals(other: Any) = canEqual(other) && {
+ val otherFetchResponse = other.asInstanceOf[kafka.javaapi.FetchResponse]
+ this.underlying.equals(otherFetchResponse.underlying)
+ }
+
+ def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchResponse]
+
+ override def hashCode = underlying.hashCode
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala Mon Sep 17 20:15:59 2012
@@ -17,9 +17,8 @@
package kafka.javaapi.consumer
-import kafka.api.FetchRequest
-import kafka.javaapi.FetchResponse
import kafka.utils.threadsafe
+import kafka.javaapi.FetchResponse
/**
* A consumer of kafka messages
@@ -32,15 +31,28 @@ class SimpleConsumer(val host: String,
val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize)
/**
- * Fetch a set of messages from a topic.
+ * Fetch a set of messages from a topic. This version of the fetch method
+ * takes the Scala version of a fetch request (i.e.,
+ * [[kafka.api.FetchRequest]] and is intended for use with the
+ * [[kafka.api.FetchRequestBuilder]].
*
* @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
* @return a set of fetched messages
*/
- def fetch(request: FetchRequest): FetchResponse = {
+ def fetch(request: kafka.api.FetchRequest): FetchResponse = {
import kafka.javaapi.Implicits._
underlying.fetch(request)
}
+
+ /**
+ * Fetch a set of messages from a topic.
+ *
+ * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
+ * @return a set of fetched messages
+ */
+ def fetch(request: kafka.javaapi.FetchRequest): FetchResponse = {
+ fetch(request.underlying)
+ }
/**
* Get a list of valid offsets (up to maxSize) before the given time.
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Mon Sep 17 20:15:59 2012
@@ -23,12 +23,14 @@ import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.nio.ByteBuffer
import kafka.api._
+import kafka.common.TopicAndPartition
+
object RequestChannel {
val AllDone = new Request(1, 2, getShutdownReceive(), 0)
def getShutdownReceive() = {
- val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Array[TopicData]())
+ val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, PartitionData]())
val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
byteBuffer.putShort(RequestKeys.ProduceKey)
emptyProducerRequest.writeTo(byteBuffer)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Mon Sep 17 20:15:59 2012
@@ -178,4 +178,4 @@ class SyncProducer(val config: SyncProdu
object ProducerRequestStat extends KafkaMetricsGroup {
val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
val requestSizeHist = newHistogram("ProducerRequestSize")
-}
\ No newline at end of file
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Mon Sep 17 20:15:59 2012
@@ -18,13 +18,13 @@
package kafka.producer.async
import kafka.common._
-import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.producer._
import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging}
-import scala.collection.Map
+import scala.collection.{Seq, Map}
import scala.collection.mutable.{ListBuffer, HashMap}
-import kafka.api._
+import kafka.api.{TopicMetadata, ProducerRequest, PartitionData}
class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -81,12 +81,13 @@ class DefaultEventHandler[K,V](config: P
val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
val failedTopicPartitions = send(brokerid, messageSetPerBroker)
- for( (topic, partition) <- failedTopicPartitions ) {
- eventsPerBrokerMap.get((topic, partition)) match {
+ failedTopicPartitions.foreach(topicPartition => {
+ eventsPerBrokerMap.get(topicPartition) match {
case Some(data) => failedProduceRequests.appendAll(data)
case None => // nothing
+
}
- }
+ })
}
} catch {
case t: Throwable => error("Failed to send messages", t)
@@ -112,7 +113,7 @@ class DefaultEventHandler[K,V](config: P
else {
// currently, if in async mode, we just log the serialization error. We need to revisit
// this when doing kafka-496
- error("Error serializing message " + t)
+ error("Error serializing message ", t)
}
}
}
@@ -122,8 +123,8 @@ class DefaultEventHandler[K,V](config: P
serializedProducerData
}
- def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = {
- val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]
+ def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]] = {
+ val ret = new HashMap[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]
try {
for (event <- events) {
val topicPartitionsList = getPartitionListForTopic(event)
@@ -135,16 +136,16 @@ class DefaultEventHandler[K,V](config: P
// postpone the failure until the send operation, so that requests for other brokers are handled correctly
val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
- var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
+ var dataPerBroker: HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]] = null
ret.get(leaderBrokerId) match {
case Some(element) =>
- dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
+ dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]]
case None =>
- dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
+ dataPerBroker = new HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]
ret.put(leaderBrokerId, dataPerBroker)
}
- val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
+ val topicAndPartition = TopicAndPartition(event.getTopic, brokerPartition.partitionId)
var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
dataPerBroker.get(topicAndPartition) match {
case Some(element) =>
@@ -199,39 +200,30 @@ class DefaultEventHandler[K,V](config: P
* @param messagesPerTopic the messages as a map from (topic, partition) -> messages
* @return the set (topic, partitions) messages which incurred an error sending or processing
*/
- private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Seq[(String, Int)] = {
+ private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = {
if(brokerId < 0) {
- warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
+ warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
messagesPerTopic.keys.toSeq
} else if(messagesPerTopic.size > 0) {
- val topics = new HashMap[String, ListBuffer[PartitionData]]()
- for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) {
- val partitionData = topics.getOrElseUpdate(topicName, new ListBuffer[PartitionData]())
- partitionData.append(new PartitionData(partitionId, messagesSet))
+ val topicPartitionDataPairs = messagesPerTopic.toSeq.map {
+ case (topicAndPartition, messages) =>
+ (topicAndPartition, new PartitionData(topicAndPartition.partition, messages))
}
- val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray
val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
- config.requestTimeoutMs, topicData)
+ config.requestTimeoutMs, Map(topicPartitionDataPairs:_*))
try {
val syncProducer = producerPool.getProducer(brokerId)
val response = syncProducer.send(producerRequest)
- trace("producer sent messages for topics %s to broker %d on %s:%d"
+ trace("Producer sent messages for topics %s to broker %d on %s:%d"
.format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
- var msgIdx = -1
- val errors = new ListBuffer[(String, Int)]
- for( topic <- topicData; partition <- topic.partitionDataArray ) {
- msgIdx += 1
- if (msgIdx >= response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError) {
- errors.append((topic.topic, partition.partition))
- if (msgIdx < response.errors.size)
- warn("Received error " + ErrorMapping.exceptionFor(response.errors(msgIdx)) +
- "from broker %d on %s:%d".format(brokerId, topic.topic, partition.partition))
- }
- }
- errors
+ if (response.status.size != producerRequest.data.size)
+ throw new KafkaException("Incomplete response (%s) for producer request (%s)"
+ .format(response, producerRequest))
+ response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
+ .map(partitionStatus => partitionStatus._1)
} catch {
- case e =>
- warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), e)
+ case t: Throwable =>
+ warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), t)
messagesPerTopic.keys.toSeq
}
} else {
@@ -239,7 +231,7 @@ class DefaultEventHandler[K,V](config: P
}
}
- private def groupMessagesToSet(eventsPerTopicAndPartition: Map[(String,Int), Seq[ProducerData[K,Message]]]): Map[(String, Int), ByteBufferMessageSet] = {
+ private def groupMessagesToSet(eventsPerTopicAndPartition: Map[TopicAndPartition, Seq[ProducerData[K,Message]]]) = {
/** enforce the compressed.topics config here.
* If the compression codec is anything other than NoCompressionCodec,
* Enable compression only for specified topics if any
@@ -255,25 +247,23 @@ class DefaultEventHandler[K,V](config: P
( topicAndPartition,
config.compressionCodec match {
case NoCompressionCodec =>
- trace("Sending %d messages with no compression to topic %s on partition %d"
- .format(messages.size, topicAndPartition._1, topicAndPartition._2))
+ trace("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
case _ =>
config.compressedTopics.size match {
case 0 =>
- trace("Sending %d messages with compression codec %d to topic %s on partition %d"
- .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+ trace("Sending %d messages with compression codec %d to %s"
+ .format(messages.size, config.compressionCodec.codec, topicAndPartition))
new ByteBufferMessageSet(config.compressionCodec, messages: _*)
case _ =>
- if(config.compressedTopics.contains(topicAndPartition._1)) {
- trace("Sending %d messages with compression codec %d to topic %s on partition %d"
- .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+ if(config.compressedTopics.contains(topicAndPartition.topic)) {
+ trace("Sending %d messages with compression codec %d to %s"
+ .format(messages.size, config.compressionCodec.codec, topicAndPartition))
new ByteBufferMessageSet(config.compressionCodec, messages: _*)
}
else {
- trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
- .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1,
- config.compressedTopics.toString))
+ trace("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
+ .format(messages.size, topicAndPartition, config.compressedTopics.toString))
new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Mon Sep 17 20:15:59 2012
@@ -19,7 +19,7 @@ package kafka.server
import kafka.cluster.Broker
import kafka.consumer.SimpleConsumer
-import kafka.common.ErrorMapping
+import kafka.common.{TopicAndPartition, ErrorMapping}
import collection.mutable
import kafka.message.ByteBufferMessageSet
import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder}
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit
abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
extends ShutdownableThread(name) {
- private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map
+ private val fetchMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
private val fetchMapLock = new Object
val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
@@ -50,7 +50,7 @@ abstract class AbstractFetcherThread(na
def handleOffsetOutOfRange(topic: String, partitionId: Int): Long
// deal with partitions with errors, potentially due to leadership changes
- def handlePartitionsWithErrors(partitions: Iterable[(String, Int)])
+ def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])
override def shutdown(){
super.shutdown()
@@ -65,12 +65,15 @@ abstract class AbstractFetcherThread(na
minBytes(minBytes)
fetchMapLock synchronized {
- for ( ((topic, partitionId), offset) <- fetchMap )
- builder.addFetch(topic, partitionId, offset.longValue, fetchSize)
+ fetchMap.foreach {
+ case((topicAndPartition, offset)) =>
+ builder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
+ offset, fetchSize)
+ }
}
val fetchRequest = builder.build()
- val partitionsWithError = new mutable.HashSet[(String, Int)]
+ val partitionsWithError = new mutable.HashSet[TopicAndPartition]
var response: FetchResponse = null
try {
trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
@@ -90,37 +93,35 @@ abstract class AbstractFetcherThread(na
if (response != null) {
// process fetched data
fetchMapLock synchronized {
- for ( topicData <- response.data ) {
- for ( partitionData <- topicData.partitionDataArray) {
- val topic = topicData.topic
- val partitionId = partitionData.partition
- val key = (topic, partitionId)
- val currentOffset = fetchMap.get(key)
+ response.data.foreach {
+ case(topicAndPartition, partitionData) =>
+ val (topic, partitionId) = topicAndPartition.asTuple
+ val currentOffset = fetchMap.get(topicAndPartition)
if (currentOffset.isDefined) {
partitionData.error match {
case ErrorMapping.NoError =>
processPartitionData(topic, currentOffset.get, partitionData)
val validBytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
- val newOffset = currentOffset.get + validBytes
- fetchMap.put(key, newOffset)
+ val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+ fetchMap.put(topicAndPartition, newOffset)
FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset
fetcherMetrics.byteRate.mark(validBytes)
case ErrorMapping.OffsetOutOfRangeCode =>
val newOffset = handleOffsetOutOfRange(topic, partitionId)
- fetchMap.put(key, newOffset)
+ fetchMap.put(topicAndPartition, newOffset)
warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
- .format(currentOffset.get, topic, partitionId, newOffset))
+ .format(currentOffset.get, topic, partitionId, newOffset))
case _ =>
error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
- ErrorMapping.exceptionFor(partitionData.error))
- partitionsWithError += key
- fetchMap.remove(key)
+ ErrorMapping.exceptionFor(partitionData.error))
+ partitionsWithError += topicAndPartition
+ fetchMap.remove(topicAndPartition)
}
}
- }
}
}
}
+
if (partitionsWithError.size > 0) {
debug("handling partitions with error for %s".format(partitionsWithError))
handlePartitionsWithErrors(partitionsWithError)
@@ -129,19 +130,19 @@ abstract class AbstractFetcherThread(na
def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
fetchMapLock synchronized {
- fetchMap.put((topic, partitionId), initialOffset)
+ fetchMap.put(TopicAndPartition(topic, partitionId), initialOffset)
}
}
def removePartition(topic: String, partitionId: Int) {
fetchMapLock synchronized {
- fetchMap.remove((topic, partitionId))
+ fetchMap.remove(TopicAndPartition(topic, partitionId))
}
}
def hasPartition(topic: String, partitionId: Int): Boolean = {
fetchMapLock synchronized {
- fetchMap.get((topic, partitionId)).isDefined
+ fetchMap.get(TopicAndPartition(topic, partitionId)).isDefined
}
}