You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/06 05:23:47 UTC
svn commit: r1417734 - in /kafka/branches/0.8/core/src:
main/scala/kafka/api/ main/scala/kafka/client/ main/scala/kafka/cluster/
main/scala/kafka/consumer/ main/scala/kafka/javaapi/
main/scala/kafka/message/ main/scala/kafka/producer/ main/scala/kafka/...
Author: jkreps
Date: Thu Dec 6 04:23:44 2012
New Revision: 1417734
URL: http://svn.apache.org/viewvc?rev=1417734&view=rev
Log:
KAFKA-642 Fixes to protocol. Patch reviewed by Neha and Joel.
Modified:
kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala
kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala Thu Dec 6 04:23:44 2012
@@ -87,6 +87,7 @@ object LeaderAndIsrRequest {
def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = {
val versionId = buffer.getShort
+ val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val ackTimeoutMs = buffer.getInt
val controllerEpoch = buffer.getInt
@@ -106,11 +107,12 @@ object LeaderAndIsrRequest {
for (i <- 0 until leadersCount)
leaders += Broker.readFrom(buffer)
- new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
+ new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
}
}
case class LeaderAndIsrRequest (versionId: Short,
+ correlationId: Int,
clientId: String,
ackTimeoutMs: Int,
partitionStateInfos: Map[(String, Int), PartitionStateInfo],
@@ -119,12 +121,13 @@ case class LeaderAndIsrRequest (versionI
extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerEpoch: Int) = {
- this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
+ this(LeaderAndIsrRequest.CurrentVersion, 0, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
partitionStateInfos, liveBrokers, controllerEpoch)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
+ buffer.putInt(correlationId)
writeShortString(buffer, clientId)
buffer.putInt(ackTimeoutMs)
buffer.putInt(controllerEpoch)
@@ -141,6 +144,7 @@ case class LeaderAndIsrRequest (versionI
def sizeInBytes(): Int = {
var size =
2 /* version id */ +
+ 4 /* correlation id */ +
(2 + clientId.length) /* client id */ +
4 /* ack timeout */ +
4 /* controller epoch */ +
Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala Thu Dec 6 04:23:44 2012
@@ -27,6 +27,7 @@ import collection.Map
object LeaderAndIsrResponse {
def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = {
val versionId = buffer.getShort
+ val correlationId = buffer.getInt
val errorCode = buffer.getShort
val numEntries = buffer.getInt
val responseMap = new HashMap[(String, Int), Short]()
@@ -36,18 +37,20 @@ object LeaderAndIsrResponse {
val partitionErrorCode = buffer.getShort
responseMap.put((topic, partition), partitionErrorCode)
}
- new LeaderAndIsrResponse(versionId, responseMap, errorCode)
+ new LeaderAndIsrResponse(versionId, correlationId, responseMap, errorCode)
}
}
case class LeaderAndIsrResponse(versionId: Short,
+ correlationId: Int,
responseMap: Map[(String, Int), Short],
errorCode: Short = ErrorMapping.NoError)
extends RequestOrResponse {
def sizeInBytes(): Int ={
var size =
2 /* version id */ +
+ 4 /* correlation id */ +
2 /* error code */ +
4 /* number of responses */
for ((key, value) <- responseMap) {
@@ -61,6 +64,7 @@ case class LeaderAndIsrResponse(versionI
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
+ buffer.putInt(correlationId)
buffer.putShort(errorCode)
buffer.putInt(responseMap.size)
for ((key:(String, Int), value) <- responseMap){
Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala Thu Dec 6 04:23:44 2012
@@ -33,6 +33,7 @@ object OffsetRequest {
def readFrom(buffer: ByteBuffer): OffsetRequest = {
val versionId = buffer.getShort
+ val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val replicaId = buffer.getInt
val topicCount = buffer.getInt
@@ -54,16 +55,18 @@ case class PartitionOffsetRequestInfo(ti
case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo],
versionId: Short = OffsetRequest.CurrentVersion,
+ correlationId: Int = 0,
clientId: String = OffsetRequest.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId)
extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
- def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, replicaId)
+ def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId)
lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
+ buffer.putInt(correlationId)
writeShortString(buffer, clientId)
buffer.putInt(replicaId)
@@ -83,6 +86,7 @@ case class OffsetRequest(requestInfo: Ma
def sizeInBytes =
2 + /* versionId */
+ 4 + /* correlationId */
shortStringLength(clientId) +
4 + /* replicaId */
4 + /* topic count */
Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala Thu Dec 6 04:23:44 2012
@@ -26,6 +26,7 @@ object OffsetResponse {
def readFrom(buffer: ByteBuffer): OffsetResponse = {
val versionId = buffer.getShort
+ val correlationId = buffer.getInt
val numTopics = buffer.getInt
val pairs = (1 to numTopics).flatMap(_ => {
val topic = readShortString(buffer)
@@ -38,7 +39,7 @@ object OffsetResponse {
(TopicAndPartition(topic, partition), PartitionOffsetsResponse(error, offsets))
})
})
- OffsetResponse(versionId, Map(pairs:_*))
+ OffsetResponse(versionId, correlationId, Map(pairs:_*))
}
}
@@ -48,6 +49,7 @@ case class PartitionOffsetsResponse(erro
case class OffsetResponse(versionId: Short,
+ correlationId: Int,
partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse])
extends RequestOrResponse {
@@ -57,6 +59,7 @@ case class OffsetResponse(versionId: Sho
val sizeInBytes = {
2 + /* versionId */
+ 4 + /* correlation id */
4 + /* topic count */
offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
val (topic, errorAndOffsetsMap) = currTopic
@@ -75,6 +78,7 @@ case class OffsetResponse(versionId: Sho
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
+ buffer.putInt(correlationId)
buffer.putInt(offsetsGroupedByTopic.size) // topic count
offsetsGroupedByTopic.foreach {
case((topic, errorAndOffsetsMap)) =>
Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Thu Dec 6 04:23:44 2012
@@ -31,6 +31,7 @@ object StopReplicaRequest extends Loggin
def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
val versionId = buffer.getShort
+ val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val ackTimeoutMs = buffer.getInt
val controllerEpoch = buffer.getInt
@@ -45,11 +46,12 @@ object StopReplicaRequest extends Loggin
(1 to topicPartitionPairCount) foreach { _ =>
topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
}
- StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch)
+ StopReplicaRequest(versionId, correlationId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch)
}
}
case class StopReplicaRequest(versionId: Short,
+ correlationId: Int,
clientId: String,
ackTimeoutMs: Int,
deletePartitions: Boolean,
@@ -58,12 +60,13 @@ case class StopReplicaRequest(versionId:
extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int) = {
- this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
+ this(StopReplicaRequest.CurrentVersion, 0, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
deletePartitions, partitions, controllerEpoch)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
+ buffer.putInt(correlationId)
writeShortString(buffer, clientId)
buffer.putInt(ackTimeoutMs)
buffer.putInt(controllerEpoch)
@@ -78,6 +81,7 @@ case class StopReplicaRequest(versionId:
def sizeInBytes(): Int = {
var size =
2 + /* versionId */
+ 4 + /* correlation id */
ApiUtils.shortStringLength(clientId) +
4 + /* ackTimeoutMs */
4 + /* controller epoch */
Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala Thu Dec 6 04:23:44 2012
@@ -27,6 +27,7 @@ import kafka.api.ApiUtils._
object StopReplicaResponse {
def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
val versionId = buffer.getShort
+ val correlationId = buffer.getInt
val errorCode = buffer.getShort
val numEntries = buffer.getInt
@@ -37,17 +38,19 @@ object StopReplicaResponse {
val partitionErrorCode = buffer.getShort()
responseMap.put((topic, partition), partitionErrorCode)
}
- new StopReplicaResponse(versionId, responseMap.toMap, errorCode)
+ new StopReplicaResponse(versionId, correlationId, responseMap.toMap, errorCode)
}
}
case class StopReplicaResponse(val versionId: Short,
+ val correlationId: Int,
val responseMap: Map[(String, Int), Short],
val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
def sizeInBytes(): Int ={
var size =
2 /* version id */ +
+ 4 /* correlation id */ +
2 /* error code */ +
4 /* number of responses */
for ((key, value) <- responseMap) {
@@ -61,6 +64,7 @@ case class StopReplicaResponse(val versi
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
+ buffer.putInt(correlationId)
buffer.putShort(errorCode)
buffer.putInt(responseMap.size)
for ((key:(String, Int), value) <- responseMap){
Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala Thu Dec 6 04:23:44 2012
@@ -21,57 +21,29 @@ import kafka.cluster.Broker
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.utils.Logging
-import collection.mutable.ListBuffer
-import kafka.common.{KafkaException, ErrorMapping}
-
-/**
- * topic (2 bytes + topic.length)
- * number of partitions (4 bytes)
- *
- * partition id (4 bytes)
- *
- * does leader exist (1 byte)
- * leader info (4 + creator.length + host.length + 4 (port) + 4 (id))
- * number of replicas (2 bytes)
- * replica info (4 + creator.length + host.length + 4 (port) + 4 (id))
- * number of in sync replicas (2 bytes)
- * replica info (4 + creator.length + host.length + 4 (port) + 4 (id))
- *
- * does log metadata exist (1 byte)
- * number of log segments (4 bytes)
- * total size of log in bytes (8 bytes)
- *
- * number of log segments (4 bytes)
- * beginning offset (8 bytes)
- * last modified timestamp (8 bytes)
- * size of log segment (8 bytes)
- *
- */
-
-sealed trait LeaderRequest { def requestId: Byte }
-case object LeaderExists extends LeaderRequest { val requestId: Byte = 1 }
-case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 }
+import collection.mutable.ArrayBuffer
+import kafka.common._
object TopicMetadata {
+
+ val NoLeaderNodeId = -1
- def readFrom(buffer: ByteBuffer): TopicMetadata = {
+ def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): TopicMetadata = {
val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
val topic = readShortString(buffer)
val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
- val partitionsMetadata = new ListBuffer[PartitionMetadata]()
+ val partitionsMetadata = new ArrayBuffer[PartitionMetadata]()
for(i <- 0 until numPartitions)
- partitionsMetadata += PartitionMetadata.readFrom(buffer)
+ partitionsMetadata += PartitionMetadata.readFrom(buffer, brokers)
new TopicMetadata(topic, partitionsMetadata, errorCode)
}
}
case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) extends Logging {
def sizeInBytes: Int = {
- var size: Int = 2 /* error code */
- size += shortStringLength(topic)
- size += partitionsMetadata.foldLeft(4 /* number of partitions */)(_ + _.sizeInBytes)
- debug("Size of topic metadata = " + size)
- size
+ 2 /* error code */ +
+ shortStringLength(topic) +
+ 4 + partitionsMetadata.map(_.sizeInBytes).sum /* size and partition data array */
}
def writeTo(buffer: ByteBuffer) {
@@ -87,40 +59,24 @@ case class TopicMetadata(topic: String,
object PartitionMetadata {
- def readFrom(buffer: ByteBuffer): PartitionMetadata = {
+ def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): PartitionMetadata = {
val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
- val doesLeaderExist = getLeaderRequest(buffer.get)
- val leader = doesLeaderExist match {
- case LeaderExists => /* leader exists */
- Some(Broker.readFrom(buffer))
- case LeaderDoesNotExist => None
- }
+ val leaderId = buffer.getInt
+ val leader = brokers.get(leaderId)
/* list of all replicas */
- val numReplicas = readShortInRange(buffer, "number of all replicas", (0, Short.MaxValue))
- val replicas = new Array[Broker](numReplicas)
- for(i <- 0 until numReplicas) {
- replicas(i) = Broker.readFrom(buffer)
- }
+ val numReplicas = readIntInRange(buffer, "number of all replicas", (0, Int.MaxValue))
+ val replicaIds = (0 until numReplicas).map(_ => buffer.getInt)
+ val replicas = replicaIds.map(brokers)
/* list of in-sync replicas */
- val numIsr = readShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue))
- val isr = new Array[Broker](numIsr)
- for(i <- 0 until numIsr) {
- isr(i) = Broker.readFrom(buffer)
- }
+ val numIsr = readIntInRange(buffer, "number of in-sync replicas", (0, Int.MaxValue))
+ val isrIds = (0 until numIsr).map(_ => buffer.getInt)
+ val isr = isrIds.map(brokers)
new PartitionMetadata(partitionId, leader, replicas, isr, errorCode)
}
-
- private def getLeaderRequest(requestId: Byte): LeaderRequest = {
- requestId match {
- case LeaderExists.requestId => LeaderExists
- case LeaderDoesNotExist.requestId => LeaderDoesNotExist
- case _ => throw new KafkaException("Unknown leader request id " + requestId)
- }
- }
}
case class PartitionMetadata(partitionId: Int,
@@ -129,42 +85,28 @@ case class PartitionMetadata(partitionId
isr: Seq[Broker] = Seq.empty,
errorCode: Short = ErrorMapping.NoError) extends Logging {
def sizeInBytes: Int = {
- var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/
-
- leader match {
- case Some(l) => size += l.sizeInBytes
- case None =>
- }
-
- size += 2 /* number of replicas */
- size += replicas.foldLeft(0)(_ + _.sizeInBytes)
- size += 2 /* number of in sync replicas */
- size += isr.foldLeft(0)(_ + _.sizeInBytes)
-
- debug("Size of partition metadata = " + size)
- size
+ 2 /* error code */ +
+ 4 /* partition id */ +
+ 4 /* leader */ +
+ 4 + 4 * replicas.size /* replica array */ +
+ 4 + 4 * isr.size /* isr array */
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(errorCode)
buffer.putInt(partitionId)
- /* if leader exists*/
- leader match {
- case Some(l) =>
- buffer.put(LeaderExists.requestId)
- /* leader id host_name port */
- l.writeTo(buffer)
- case None => buffer.put(LeaderDoesNotExist.requestId)
- }
+ /* leader */
+ val leaderId = if(leader.isDefined) leader.get.id else TopicMetadata.NoLeaderNodeId
+ buffer.putInt(leaderId)
/* number of replicas */
- buffer.putShort(replicas.size.toShort)
- replicas.foreach(r => r.writeTo(buffer))
+ buffer.putInt(replicas.size)
+ replicas.foreach(r => buffer.putInt(r.id))
/* number of in-sync replicas */
- buffer.putShort(isr.size.toShort)
- isr.foreach(r => r.writeTo(buffer))
+ buffer.putInt(isr.size)
+ isr.foreach(r => buffer.putInt(r.id))
}
}
Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala Thu Dec 6 04:23:44 2012
@@ -33,6 +33,7 @@ object TopicMetadataRequest extends Logg
def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
val versionId = buffer.getShort
+ val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue))
val topics = new ListBuffer[String]()
@@ -40,26 +41,28 @@ object TopicMetadataRequest extends Logg
topics += readShortString(buffer)
val topicsList = topics.toList
debug("topic = %s".format(topicsList.head))
- new TopicMetadataRequest(versionId, clientId, topics.toList)
+ new TopicMetadataRequest(versionId, clientId, topics.toList, correlationId)
}
}
case class TopicMetadataRequest(val versionId: Short,
val clientId: String,
- val topics: Seq[String])
+ val topics: Seq[String],
+ val correlationId: Int)
extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
def this(topics: Seq[String]) =
- this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics)
+ this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, 0)
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
+ buffer.putInt(correlationId) // correlation id not set yet
writeShortString(buffer, clientId)
buffer.putInt(topics.size)
topics.foreach(topic => writeShortString(buffer, topic))
}
def sizeInBytes(): Int = {
- 2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
+ 2 + 4 + shortStringLength(clientId) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
}
}
Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala Thu Dec 6 04:23:44 2012
@@ -17,30 +17,46 @@
package kafka.api
+import kafka.cluster.Broker
import java.nio.ByteBuffer
object TopicMetadataResponse {
def readFrom(buffer: ByteBuffer): TopicMetadataResponse = {
val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val brokerCount = buffer.getInt
+ val brokers = (0 until brokerCount).map(_ => Broker.readFrom(buffer))
+ val brokerMap = brokers.map(b => (b.id, b)).toMap
val topicCount = buffer.getInt
- val topicsMetadata = new Array[TopicMetadata](topicCount)
- for( i <- 0 until topicCount) {
- topicsMetadata(i) = TopicMetadata.readFrom(buffer)
- }
- new TopicMetadataResponse(versionId, topicsMetadata.toSeq)
+ val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap))
+ new TopicMetadataResponse(versionId, topicsMetadata, correlationId)
}
}
case class TopicMetadataResponse(versionId: Short,
- topicsMetadata: Seq[TopicMetadata]) extends RequestOrResponse
-{
- val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes)
+ topicsMetadata: Seq[TopicMetadata],
+ correlationId: Int) extends RequestOrResponse {
+ val sizeInBytes: Int = {
+ val brokers = extractBrokers(topicsMetadata).values
+ 2 + 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum
+ }
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ /* brokers */
+ val brokers = extractBrokers(topicsMetadata).values
+ buffer.putInt(brokers.size)
+ brokers.foreach(_.writeTo(buffer))
/* topic metadata */
buffer.putInt(topicsMetadata.length)
topicsMetadata.foreach(_.writeTo(buffer))
}
+
+ def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = {
+ val parts = topicsMetadata.flatMap(_.partitionsMetadata)
+ val brokers = parts.flatMap(_.replicas) ++ parts.map(_.leader).collect{case Some(l) => l}
+ brokers.map(b => (b.id, b)).toMap
+ }
}
Modified: kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala Thu Dec 6 04:23:44 2012
@@ -75,8 +75,7 @@ object ClientUtils extends Logging{
val brokerInfos = brokerStr.split(":")
val hostName = brokerInfos(0)
val port = brokerInfos(1).toInt
- val creatorId = hostName + "-" + System.currentTimeMillis()
- new Broker(brokerId, creatorId, hostName, port)
+ new Broker(brokerId, hostName, port)
})
}
Modified: kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala Thu Dec 6 04:23:44 2012
@@ -31,38 +31,32 @@ private[kafka] object Broker {
if(brokerInfoString == null)
throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
val brokerInfo = brokerInfoString.split(":")
- new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
+ new Broker(id, brokerInfo(0), brokerInfo(1).toInt)
}
def readFrom(buffer: ByteBuffer): Broker = {
val id = buffer.getInt
- val creatorId = readShortString(buffer)
val host = readShortString(buffer)
val port = buffer.getInt
- new Broker(id, creatorId, host, port)
+ new Broker(id, host, port)
}
}
-private[kafka] case class Broker(val id: Int, val creatorId: String, val host: String, val port: Int) {
+private[kafka] case class Broker(val id: Int, val host: String, val port: Int) {
- override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port)
+ override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port)
- def getZkString(): String = new String(creatorId + ":" + host + ":" + port)
+ def getZkString(): String = host + ":" + port
- def getConnectionString(): String = new String(host + ":" + port)
+ def getConnectionString(): String = host + ":" + port
def writeTo(buffer: ByteBuffer) {
buffer.putInt(id)
- writeShortString(buffer, creatorId)
writeShortString(buffer, host)
buffer.putInt(port)
}
- def sizeInBytes: Int = {
- val size = shortStringLength(creatorId) + shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
- debug("Size of broker info = " + size)
- size
- }
+ def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
override def equals(obj: Any): Boolean = {
obj match {
Modified: kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Thu Dec 6 04:23:44 2012
@@ -30,8 +30,12 @@ import kafka.cluster.Broker
object SimpleConsumer extends Logging {
- def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long,
- clientId: String, isFromOrdinaryConsumer: Boolean): Long = {
+ def earliestOrLatestOffset(broker: Broker,
+ topic: String,
+ partitionId: Int,
+ earliestOrLatest: Long,
+ clientId: String,
+ isFromOrdinaryConsumer: Boolean): Long = {
var simpleConsumer: SimpleConsumer = null
var producedOffset: Long = -1L
try {
@@ -42,7 +46,7 @@ object SimpleConsumer extends Logging {
new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
else
new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
- Request.DebuggingConsumerId)
+ 0, Request.DebuggingConsumerId)
producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
} catch {
case e =>
@@ -55,8 +59,13 @@ object SimpleConsumer extends Logging {
producedOffset
}
- def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int,
- earliestOrLatest: Long, clientId: String, isFromOrdinaryConsumer: Boolean = true): Long = {
+ def earliestOrLatestOffset(zkClient: ZkClient,
+ topic: String,
+ brokerId: Int,
+ partitionId: Int,
+ earliestOrLatest: Long,
+ clientId: String,
+ isFromOrdinaryConsumer: Boolean = true): Long = {
val cluster = getCluster(zkClient)
val broker = cluster.getBroker(brokerId) match {
case Some(b) => b
Modified: kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala Thu Dec 6 04:23:44 2012
@@ -20,14 +20,15 @@ import kafka.api._
import java.nio.ByteBuffer
import scala.collection.JavaConversions
-class TopicMetadataRequest(val versionId: Short,
+class TopicMetadataRequest(val correlationId: Int,
+ val versionId: Short,
val clientId: String,
val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
val underlying: kafka.api.TopicMetadataRequest =
- new kafka.api.TopicMetadataRequest(versionId, clientId, JavaConversions.asBuffer(topics))
+ new kafka.api.TopicMetadataRequest(versionId, clientId, JavaConversions.asBuffer(topics), correlationId)
def this(topics: java.util.List[String]) =
- this(kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
+ this(0, kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
Modified: kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala Thu Dec 6 04:23:44 2012
@@ -38,12 +38,15 @@ object Message {
val KeySizeOffset = AttributesOffset + AttributesLength
val KeySizeLength = 4
val KeyOffset = KeySizeOffset + KeySizeLength
- val MessageOverhead = KeyOffset
+ val ValueSizeLength = 4
+
+ /** The amount of overhead bytes in a message */
+ val MessageOverhead = KeyOffset + ValueSizeLength
/**
* The minimum valid size for the message header
*/
- val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength
+ val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength
/**
* The current "magic" value
@@ -97,22 +100,24 @@ class Message(val buffer: ByteBuffer) {
Message.AttributesLength +
Message.KeySizeLength +
(if(key == null) 0 else key.length) +
+ Message.ValueSizeLength +
(if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset)))
// skip crc, we will fill that in at the end
- buffer.put(MagicOffset, CurrentMagicValue)
- var attributes:Byte = 0
+ buffer.position(MagicOffset)
+ buffer.put(CurrentMagicValue)
+ var attributes: Byte = 0
if (codec.codec > 0)
attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
- buffer.put(AttributesOffset, attributes)
+ buffer.put(attributes)
if(key == null) {
- buffer.putInt(KeySizeOffset, -1)
- buffer.position(KeyOffset)
+ buffer.putInt(-1)
} else {
- buffer.putInt(KeySizeOffset, key.length)
- buffer.position(KeyOffset)
+ buffer.putInt(key.length)
buffer.put(key, 0, key.length)
}
- buffer.put(bytes, payloadOffset, if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset)
+ val size = if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset
+ buffer.putInt(size)
+ buffer.put(bytes, payloadOffset, size)
buffer.rewind()
// now compute the checksum and fill it in
@@ -171,9 +176,14 @@ class Message(val buffer: ByteBuffer) {
def hasKey: Boolean = keySize >= 0
/**
+ * The position where the payload size is stored
+ */
+ private def payloadSizeOffset = Message.KeyOffset + max(0, keySize)
+
+ /**
* The length of the message value in bytes
*/
- def payloadSize: Int = size - KeyOffset - max(0, keySize)
+ def payloadSize: Int = buffer.getInt(payloadSizeOffset)
/**
* The magic version of this message
@@ -194,29 +204,27 @@ class Message(val buffer: ByteBuffer) {
/**
* A ByteBuffer containing the content of the message
*/
- def payload: ByteBuffer = {
- var payload = buffer.duplicate
- payload.position(KeyOffset + max(keySize, 0))
- payload = payload.slice()
- payload.limit(payloadSize)
- payload.rewind()
- payload
- }
+ def payload: ByteBuffer = sliceDelimited(payloadSizeOffset)
/**
* A ByteBuffer containing the message key
*/
- def key: ByteBuffer = {
- val s = keySize
- if(s < 0) {
+ def key: ByteBuffer = sliceDelimited(KeySizeOffset)
+
+ /**
+ * Read a size-delimited byte buffer starting at the given offset
+ */
+ private def sliceDelimited(start: Int): ByteBuffer = {
+ val size = buffer.getInt(start)
+ if(size < 0) {
null
} else {
- var key = buffer.duplicate
- key.position(KeyOffset)
- key = key.slice()
- key.limit(s)
- key.rewind()
- key
+ var b = buffer.duplicate
+ b.position(start + 4)
+ b = b.slice()
+ b.limit(size)
+ b.rewind
+ b
}
}
Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Thu Dec 6 04:23:44 2012
@@ -33,7 +33,7 @@ class ProducerConfig private (val props:
/** This is for bootstrapping and the producer will only use it for getting metadata
* (topics, partitions and replicas). The socket connections for sending the actual data
* will be established based on the broker information returned in the metadata. The
- * format is host1:por1,host2:port2, and the list can be a subset of brokers or
+ * format is host1:port1,host2:port2, and the list can be a subset of brokers or
* a VIP pointing to a subset of brokers.
*/
val brokerList = props.getString("broker.list")
Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Thu Dec 6 04:23:44 2012
@@ -41,9 +41,6 @@ trait SyncProducerConfigShared {
val maxMessageSize = props.getInt("max.message.size", 1000000)
/* the client application sending the producer requests */
- val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
-
- /* the client application sending the producer requests */
val clientId = props.getString("clientid", SyncProducerConfig.DefaultClientId)
/*
@@ -61,7 +58,6 @@ trait SyncProducerConfigShared {
}
object SyncProducerConfig {
- val DefaultCorrelationId = -1
val DefaultClientId = ""
val DefaultRequiredAcks : Short = 0
val DefaultAckTimeoutMs = 1500
Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Thu Dec 6 04:23:44 2012
@@ -39,7 +39,8 @@ class DefaultEventHandler[K,V](config: P
extends EventHandler[K,V] with Logging {
val isSync = ("sync" == config.producerType)
- val counter = new AtomicInteger(0)
+ val partitionCounter = new AtomicInteger(0)
+ val correlationCounter = new AtomicInteger(0)
val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
private val lock = new Object()
@@ -191,7 +192,7 @@ class DefaultEventHandler[K,V](config: P
"\n Valid values are > 0")
val partition =
if(key == null)
- Utils.abs(counter.getAndIncrement()) % numPartitions
+ Utils.abs(partitionCounter.getAndIncrement()) % numPartitions
else
partitioner.partition(key, numPartitions)
if(partition < 0 || partition >= numPartitions)
@@ -212,7 +213,7 @@ class DefaultEventHandler[K,V](config: P
warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
messagesPerTopic.keys.toSeq
} else if(messagesPerTopic.size > 0) {
- val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
+ val producerRequest = new ProducerRequest(correlationCounter.getAndIncrement(), config.clientId, config.requiredAcks,
config.requestTimeoutMs, messagesPerTopic)
try {
val syncProducer = producerPool.getProducer(brokerId)
Modified: kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Dec 6 04:23:44 2012
@@ -88,7 +88,7 @@ class KafkaApis(val requestChannel: Requ
case (topicAndPartition, partitionOffsetRequest) =>
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null))
}
- val errorResponse = OffsetResponse(apiRequest.versionId, partitionOffsetResponseMap)
+ val errorResponse = OffsetResponse(apiRequest.versionId, apiRequest.correlationId, partitionOffsetResponseMap)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
error("error when handling request %s".format(apiRequest), e)
case RequestKeys.MetadataKey =>
@@ -96,7 +96,7 @@ class KafkaApis(val requestChannel: Requ
val topicMeatadata = apiRequest.topics.map {
topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
- val errorResponse = TopicMetadataResponse(apiRequest.versionId, topicMeatadata)
+ val errorResponse = TopicMetadataResponse(apiRequest.versionId, topicMeatadata, apiRequest.correlationId)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
error("error when handling request %s".format(apiRequest), e)
case RequestKeys.LeaderAndIsrKey =>
@@ -104,7 +104,7 @@ class KafkaApis(val requestChannel: Requ
val responseMap = apiRequest.partitionStateInfos.map {
case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
- val errorResponse = LeaderAndIsrResponse(apiRequest.versionId, responseMap)
+ val errorResponse = LeaderAndIsrResponse(apiRequest.versionId, apiRequest.correlationId, responseMap)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
error("error when handling request %s".format(apiRequest), e)
case RequestKeys.StopReplicaKey =>
@@ -113,7 +113,7 @@ class KafkaApis(val requestChannel: Requ
case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}.toMap
error("error when handling request %s".format(apiRequest), e)
- val errorResponse = StopReplicaResponse(apiRequest.versionId, responseMap)
+ val errorResponse = StopReplicaResponse(apiRequest.versionId, apiRequest.correlationId, responseMap)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}
} finally
@@ -127,7 +127,7 @@ class KafkaApis(val requestChannel: Requ
trace("Handling leader and ISR request " + leaderAndIsrRequest)
try {
val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
- val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, response, error)
+ val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, leaderAndIsrRequest.correlationId, response, error)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
} catch {
case e: KafkaStorageException =>
@@ -144,7 +144,7 @@ class KafkaApis(val requestChannel: Requ
trace("Handling stop replica request " + stopReplicaRequest)
val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
- val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, response.toMap, error)
+ val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, stopReplicaRequest.correlationId, response.toMap, error)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
@@ -409,7 +409,7 @@ class KafkaApis(val requestChannel: Requ
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
}
})
- val response = OffsetResponse(OffsetRequest.CurrentVersion, responseMap)
+ val response = OffsetResponse(OffsetRequest.CurrentVersion, offsetRequest.correlationId, responseMap)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
@@ -458,7 +458,7 @@ class KafkaApis(val requestChannel: Requ
}
})
topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
- val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq)
+ val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq, metadataRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
Modified: kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Thu Dec 6 04:23:44 2012
@@ -43,8 +43,7 @@ class KafkaZooKeeper(config: KafkaConfig
private def registerBrokerInZk() {
info("Registering broker " + brokerIdPath)
val hostName = config.hostName
- val creatorId = hostName + "-" + System.currentTimeMillis
- ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
+ ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port)
}
/**
Modified: kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Thu Dec 6 04:23:44 2012
@@ -180,9 +180,9 @@ object ZkUtils extends Logging {
replicas.contains(brokerId.toString)
}
- def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
+ def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
- val broker = new Broker(id, creator, host, port)
+ val broker = new Broker(id, host, port)
try {
createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString)
} catch {
Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala Thu Dec 6 04:23:44 2012
@@ -75,10 +75,11 @@ object SerializationTestUtils{
TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100)
)
- private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
- private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
- private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
- private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+ private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, "localhost", 1012), new Broker(2, "localhost", 1013))
+ private val partitionMetaData0 = new PartitionMetadata(0, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0)
+ private val partitionMetaData1 = new PartitionMetadata(1, Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1)
+ private val partitionMetaData2 = new PartitionMetadata(2, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2)
+ private val partitionMetaData3 = new PartitionMetadata(3, Some(brokers.head), replicas = brokers, isr = brokers.tail.tail, errorCode = 3)
private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
@@ -94,7 +95,7 @@ object SerializationTestUtils{
def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
((topic2, 0), ErrorMapping.NoError))
- new LeaderAndIsrResponse(1, responseMap)
+ new LeaderAndIsrResponse(1, 1, responseMap)
}
def createTestStopReplicaRequest() : StopReplicaRequest = {
@@ -104,7 +105,7 @@ object SerializationTestUtils{
def createTestStopReplicaResponse() : StopReplicaResponse = {
val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
((topic2, 0), ErrorMapping.NoError))
- new StopReplicaResponse(1, responseMap.toMap)
+ new StopReplicaResponse(1, 0, responseMap.toMap)
}
def createTestProducerRequest: ProducerRequest = {
@@ -131,17 +132,17 @@ object SerializationTestUtils{
)
def createTestOffsetResponse: OffsetResponse = {
- new OffsetResponse(OffsetRequest.CurrentVersion, collection.immutable.Map(
+ new OffsetResponse(OffsetRequest.CurrentVersion, 0, collection.immutable.Map(
TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(ErrorMapping.NoError, Seq(1000l, 2000l, 3000l, 4000l)))
)
}
def createTestTopicMetadataRequest: TopicMetadataRequest = {
- new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2))
+ new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2), 1)
}
def createTestTopicMetadataResponse: TopicMetadataResponse = {
- new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2))
+ new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2), 1)
}
}
Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala Thu Dec 6 04:23:44 2012
@@ -47,7 +47,7 @@ class ConsumerIteratorTest extends JUnit
val group = "group1"
val consumer0 = "consumer0"
val consumedOffset = 5
- val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
+ val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
c.brokerId,
Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Thu Dec 6 04:23:44 2012
@@ -40,7 +40,7 @@ class FetcherTest extends JUnit3Suite wi
yield new KafkaConfig(props)
val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
val topic = "topic"
- val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
+ val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
val shutdown = ZookeeperConsumerConnector.shutdownCommand
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Thu Dec 6 04:23:44 2012
@@ -92,7 +92,7 @@ class LogOffsetTest extends JUnit3Suite
log.flush()
val offsets = log.getOffsetsBefore(OffsetRequest.LatestTime, 10)
- assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets)
+ assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
val topicAndPartition = TopicAndPartition(topic, part)
@@ -101,7 +101,7 @@ class LogOffsetTest extends JUnit3Suite
replicaId = 0)
val consumerOffsets =
simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
- assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets)
+ assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
// try to fetch using latest offset
val fetchResponse = simpleConsumer.fetch(
@@ -157,14 +157,14 @@ class LogOffsetTest extends JUnit3Suite
val now = time.milliseconds
val offsets = log.getOffsetsBefore(now, 10)
- assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets)
+ assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
val topicAndPartition = TopicAndPartition(topic, part)
val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
val consumerOffsets =
simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
- assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets)
+ assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
}
@Test
Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Thu Dec 6 04:23:44 2012
@@ -74,7 +74,7 @@ class SocketServerTest extends JUnitSuit
@Test
def simpleRequest() {
val socket = connect()
- val correlationId = SyncProducerConfig.DefaultCorrelationId
+ val correlationId = -1
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val ack = SyncProducerConfig.DefaultRequiredAcks
Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Thu Dec 6 04:23:44 2012
@@ -168,8 +168,8 @@ class AsyncProducerTest extends JUnit3Su
val props = new Properties()
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
- val broker1 = new Broker(0, "localhost", "localhost", 9092)
- val broker2 = new Broker(1, "localhost", "localhost", 9093)
+ val broker1 = new Broker(0, "localhost", 9092)
+ val broker2 = new Broker(1, "localhost", 9093)
broker1
// form expected partitions metadata
val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2))
@@ -427,17 +427,18 @@ class AsyncProducerTest extends JUnit3Su
// produce request for topic1 and partitions 0 and 1. Let the first request fail
// entirely. The second request will succeed for partition 1 but fail for partition 0.
// On the third try for partition 0, let it succeed.
- val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0)
+ val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 0)
+ val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 1)
val response1 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)),
(TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
- val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs))
+ val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 2)
val response2 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
- EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1)
- EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response2)
+ EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1)
+ EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2)
EasyMock.replay(mockSyncProducer)
val producerPool = EasyMock.createMock(classOf[ProducerPool])
@@ -510,7 +511,7 @@ class AsyncProducerTest extends JUnit3Su
}
private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
- val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort)
+ val broker1 = new Broker(brokerId, brokerHost, brokerPort)
new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
}
Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Thu Dec 6 04:23:44 2012
@@ -81,7 +81,7 @@ class SyncProducerTest extends JUnit3Sui
props.put("connect.timeout.ms", "300")
props.put("reconnect.interval", "500")
props.put("max.message.size", "100")
- val correlationId = SyncProducerConfig.DefaultCorrelationId
+ val correlationId = 0
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val ack = SyncProducerConfig.DefaultRequiredAcks
@@ -98,9 +98,7 @@ class SyncProducerTest extends JUnit3Sui
val props = new Properties()
props.put("host", "localhost")
props.put("port", server.socketServer.port.toString)
- props.put("buffer.size", "102400")
- props.put("connect.timeout.ms", "300")
- props.put("reconnect.interval", "500")
+ props.put("max.message.size", 50000.toString)
val producer = new SyncProducer(new SyncProducerConfig(props))
CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Thu Dec 6 04:23:44 2012
@@ -123,7 +123,7 @@ class LeaderElectionTest extends JUnit3S
// start another controller
val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort()))
- val brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, "localhost", s.config.port))
+ val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig)
controllerChannelManager.startup()
val staleControllerEpoch = 0
Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1417734&r1=1417733&r2=1417734&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Thu Dec 6 04:23:44 2012
@@ -24,6 +24,7 @@ import java.nio.channels._
import java.util.Random
import java.util.Properties
import junit.framework.Assert._
+import kafka.api._
import kafka.server._
import kafka.producer._
import kafka.message._
@@ -333,13 +334,13 @@ object TestUtils extends Logging {
}
def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
- val brokers = ids.map(id => new Broker(id, "localhost" + System.currentTimeMillis(), "localhost", 6667))
- brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.creatorId, b.port))
+ val brokers = ids.map(id => new Broker(id, "localhost", 6667))
+ brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port))
brokers
}
def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
- val brokers = ids.map(id => new Broker(id, "localhost" + System.currentTimeMillis(), "localhost", 6667))
+ val brokers = ids.map(id => new Broker(id, "localhost", 6667))
brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b))
brokers
}
@@ -354,22 +355,27 @@ object TestUtils extends Logging {
/**
* Create a wired format request based on simple basic information
*/
- def produceRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
- produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message)
- }
-
- def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
- produceRequestWithAcks(List(topic), List(partition), message, SyncProducerConfig.DefaultRequiredAcks)
- }
-
- def produceRequestWithAcks(topics: Seq[String], partitions: Seq[Int], message: ByteBufferMessageSet, acks: Int): kafka.api.ProducerRequest = {
- val correlationId = SyncProducerConfig.DefaultCorrelationId
- val clientId = SyncProducerConfig.DefaultClientId
- val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+ def produceRequest(topic: String,
+ partition: Int,
+ message: ByteBufferMessageSet,
+ acks: Int = SyncProducerConfig.DefaultRequiredAcks,
+ timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
+ correlationId: Int = 0,
+ clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = {
+ produceRequestWithAcks(Seq(topic), Seq(partition), message, acks, timeout, correlationId, clientId)
+ }
+
+ def produceRequestWithAcks(topics: Seq[String],
+ partitions: Seq[Int],
+ message: ByteBufferMessageSet,
+ acks: Int = SyncProducerConfig.DefaultRequiredAcks,
+ timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
+ correlationId: Int = 0,
+ clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = {
val data = topics.flatMap(topic =>
partitions.map(partition => (TopicAndPartition(topic, partition), message))
)
- new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
+ new ProducerRequest(correlationId, clientId, acks.toShort, timeout, Map(data:_*))
}
def makeLeaderForPartition(zkClient: ZkClient, topic: String,