You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:22:01 UTC
[24/37] git commit: KAFKA-736 Add an option to the 0.8 producer to
mimic 0.7 producer behavior; reviewed by Jun Rao and Sriram Subramanian
KAFKA-736 Add an option to the 0.8 producer to mimic 0.7 producer behavior; reviewed by Jun Rao and Sriram Subramanian
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4d8fb1ee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4d8fb1ee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4d8fb1ee
Branch: refs/heads/trunk
Commit: 4d8fb1eebc043fab11c58d3309e93cc83ef24a89
Parents: 828ce83
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Feb 22 17:48:10 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 22 17:48:10 2013 -0800
----------------------------------------------------------------------
config/log4j.properties | 1 +
.../main/scala/kafka/api/ProducerResponse.scala | 2 -
.../main/scala/kafka/network/RequestChannel.scala | 2 +-
.../main/scala/kafka/network/SocketServer.scala | 30 +++++--
.../scala/kafka/producer/BrokerPartitionInfo.scala | 2 +-
.../scala/kafka/producer/ConsoleProducer.scala | 10 ++-
.../scala/kafka/producer/KafkaLog4jAppender.scala | 13 ++-
core/src/main/scala/kafka/producer/Producer.scala | 2 +-
.../main/scala/kafka/producer/SyncProducer.scala | 17 +++-
.../kafka/producer/async/DefaultEventHandler.scala | 30 ++++---
core/src/main/scala/kafka/server/KafkaApis.scala | 9 ++-
.../unit/kafka/integration/PrimitiveApiTest.scala | 48 ++++++++---
.../integration/ProducerConsumerTestHarness.scala | 14 +---
.../scala/unit/kafka/log/OffsetIndexTest.scala | 2 +-
.../unit/kafka/log4j/KafkaLog4jAppenderTest.scala | 7 +-
.../unit/kafka/network/SocketServerTest.scala | 36 +++++++-
.../unit/kafka/producer/AsyncProducerTest.scala | 13 ++--
.../scala/unit/kafka/producer/ProducerTest.scala | 3 +-
.../unit/kafka/producer/SyncProducerTest.scala | 69 +++++++--------
.../scala/unit/kafka/server/LogRecoveryTest.scala | 2 +-
.../unit/kafka/server/ServerShutdownTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 25 ++++--
22 files changed, 215 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index e104751..5692da0 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -47,6 +47,7 @@ log4j.logger.kafka=INFO, kafkaAppender
log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.request.logger=TRACE, requestAppender
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 743227d..5bff709 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -22,7 +22,6 @@ import scala.collection.Map
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.api.ApiUtils._
-
object ProducerResponse {
def readFrom(buffer: ByteBuffer): ProducerResponse = {
val correlationId = buffer.getInt
@@ -44,7 +43,6 @@ object ProducerResponse {
case class ProducerResponseStatus(error: Short, offset: Long)
-
case class ProducerResponse(correlationId: Int,
status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7747ddd..931092d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -116,7 +116,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
}
/** Get the next request or block until there is one */
- def receiveRequest(): RequestChannel.Request =
+ def receiveRequest(): RequestChannel.Request =
requestQueue.take()
/** Get a response for the given processor if there is one */
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d5a24f6..648d936 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -35,7 +35,7 @@ import kafka.utils._
class SocketServer(val brokerId: Int,
val host: String,
val port: Int,
- val numProcessorThreads: Int,
+ val numProcessorThreads: Int,
val maxQueuedRequests: Int,
val maxRequestSize: Int = Int.MaxValue) extends Logging {
this.logIdent = "[Socket Server on Broker " + brokerId + "], "
@@ -206,7 +206,7 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce
* each of which has its own selectors
*/
private[kafka] class Processor(val id: Int,
- val time: Time,
+ val time: Time,
val maxRequestSize: Int,
val requestChannel: RequestChannel) extends AbstractServerThread {
@@ -219,7 +219,9 @@ private[kafka] class Processor(val id: Int,
configureNewConnections()
// register any new responses for writing
processNewResponses()
+ val startSelectTime = SystemTime.milliseconds
val ready = selector.select(300)
+ trace("Processor id " + id + " selection time = " + (SystemTime.milliseconds - startSelectTime) + " ms")
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
@@ -259,11 +261,21 @@ private[kafka] class Processor(val id: Int,
private def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while(curr != null) {
- trace("Socket server received response to send, registering for write: " + curr)
val key = curr.request.requestKey.asInstanceOf[SelectionKey]
try {
- key.interestOps(SelectionKey.OP_WRITE)
- key.attach(curr)
+ if(curr.responseSend == null) {
+ // a null response send object indicates that there is no response to send to the client.
+ // In this case, we just want to turn the interest ops to READ to be able to read more pipelined requests
+ // that are sitting in the server's socket buffer
+ trace("Socket server received empty response to send, registering for read: " + curr)
+ key.interestOps(SelectionKey.OP_READ)
+ key.attach(null)
+ curr.request.updateRequestMetrics
+ } else {
+ trace("Socket server received response to send, registering for write: " + curr)
+ key.interestOps(SelectionKey.OP_WRITE)
+ key.attach(curr)
+ }
} catch {
case e: CancelledKeyException => {
debug("Ignoring response for closed socket.")
@@ -298,7 +310,7 @@ private[kafka] class Processor(val id: Int,
private def configureNewConnections() {
while(newConnections.size() > 0) {
val channel = newConnections.poll()
- debug("Listening to new connection from " + channel.socket.getRemoteSocketAddress)
+ debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
channel.register(selector, SelectionKey.OP_READ)
}
}
@@ -321,10 +333,12 @@ private[kafka] class Processor(val id: Int,
} else if(receive.complete) {
val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
requestChannel.sendRequest(req)
- trace("Received request, sending for processing by handler: " + req)
key.attach(null)
+ // explicitly reset interest ops to not READ, no need to wake up the selector just yet
+ key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
} else {
// more reading to be done
+ trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_READ)
wakeup()
}
@@ -344,8 +358,10 @@ private[kafka] class Processor(val id: Int,
if(responseSend.complete) {
response.request.updateRequestMetrics()
key.attach(null)
+ trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_READ)
} else {
+ trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_WRITE)
wakeup()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index b209a97..a0e2b44 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -80,7 +80,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
if(tmd.errorCode == ErrorMapping.NoError){
topicPartitionInfo.put(tmd.topic, tmd)
} else
- warn("Metadata for topic [%s] is erronous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
+ warn("Metadata for topic [%s] is erroneous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
tmd.partitionsMetadata.foreach(pmd =>{
if (pmd.errorCode != ErrorMapping.NoError){
debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode))
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 8b77465..eebfda6 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -87,7 +87,12 @@ object ConsoleProducer {
.describedAs("reader_class")
.ofType(classOf[java.lang.String])
.defaultsTo(classOf[LineMessageReader].getName)
- val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
+ val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
+ .withRequiredArg
+ .describedAs("size")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1024*100)
+ val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
"This allows custom configuration for a user-defined message reader.")
.withRequiredArg
.describedAs("prop")
@@ -116,6 +121,7 @@ object ConsoleProducer {
val keyEncoderClass = options.valueOf(keyEncoderOpt)
val valueEncoderClass = options.valueOf(valueEncoderOpt)
val readerClass = options.valueOf(messageReaderOpt)
+ val socketBuffer = options.valueOf(socketBufferSizeOpt)
val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
cmdLineProps.put("topic", topic)
@@ -133,7 +139,7 @@ object ConsoleProducer {
props.put("request.timeout.ms", requestTimeoutMs.toString)
props.put("key.serializer.class", keyEncoderClass)
props.put("serializer.class", valueEncoderClass)
-
+ props.put("send.buffer.bytes", socketBuffer.toString)
val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
reader.init(System.in, cmdLineProps)
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index af077e0..3d22e6d 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -32,6 +32,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
var compressionCodec:String = null
var enqueueTimeout:String = null
var queueSize:String = null
+ var requiredNumAcks: Int = Int.MaxValue
private var producer: Producer[String, String] = null
@@ -40,22 +41,25 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
def getBrokerList:String = brokerList
def setBrokerList(brokerList: String) { this.brokerList = brokerList }
-
+
def getSerializerClass:String = serializerClass
def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass }
def getProducerType:String = producerType
def setProducerType(producerType:String) { this.producerType = producerType }
-
+
def getCompressionCodec:String = compressionCodec
def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec }
-
+
def getEnqueueTimeout:String = enqueueTimeout
def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout }
def getQueueSize:String = queueSize
def setQueueSize(queueSize:String) { this.queueSize = queueSize }
+ def getRequiredNumAcks:Int = requiredNumAcks
+ def setRequiredNumAcks(requiredNumAcks:Int) { this.requiredNumAcks = requiredNumAcks }
+
override def activateOptions() {
// check for config parameter validity
val props = new Properties()
@@ -75,12 +79,13 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
if(compressionCodec != null) props.put("compression.codec", compressionCodec)
if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout)
if(queueSize != null) props.put("queue.buffering.max.messages", queueSize)
+ if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString)
val config : ProducerConfig = new ProducerConfig(props)
producer = new Producer[String, String](config)
LogLog.debug("Kafka producer connected to " + config.brokerList)
LogLog.debug("Logging for topic: " + topic)
}
-
+
override def append(event: LoggingEvent) {
val message : String = if( this.layout == null) {
event.getRenderedMessage
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 66638f2..3ded46e 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -26,7 +26,7 @@ import kafka.common.QueueFullException
import kafka.metrics._
-class Producer[K,V](config: ProducerConfig,
+class Producer[K,V](val config: ProducerConfig,
private val eventHandler: EventHandler[K,V]) // only for unit testing
extends Logging {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 0469a39..306f200 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -62,7 +62,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
/**
* Common functionality for the public send methods
*/
- private def doSend(request: RequestOrResponse): Receive = {
+ private def doSend(request: RequestOrResponse, readResponse: Boolean = true): Receive = {
lock synchronized {
verifyRequest(request)
getOrMakeConnection()
@@ -70,7 +70,10 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
var response: Receive = null
try {
blockingChannel.send(request)
- response = blockingChannel.receive()
+ if(readResponse)
+ response = blockingChannel.receive()
+ else
+ trace("Skipping reading response")
} catch {
case e: java.io.IOException =>
// no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
@@ -83,7 +86,8 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
}
/**
- * Send a message
+ * Send a message. If the producerRequest had required.request.acks=0, then the
+ * returned response object is null
*/
def send(producerRequest: ProducerRequest): ProducerResponse = {
val requestSize = producerRequest.sizeInBytes
@@ -95,10 +99,13 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer
aggregateTimer.time {
specificTimer.time {
- response = doSend(producerRequest)
+ response = doSend(producerRequest, if(producerRequest.requiredAcks == 0) false else true)
}
}
- ProducerResponse.readFrom(response.buffer)
+ if(producerRequest.requiredAcks != 0)
+ ProducerResponse.readFrom(response.buffer)
+ else
+ null
}
def send(request: TopicMetadataRequest): TopicMetadataResponse = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 5569cc2..ebab1da 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -243,20 +243,22 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
val response = syncProducer.send(producerRequest)
debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"
.format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
- if (response.status.size != producerRequest.data.size)
- throw new KafkaException("Incomplete response (%s) for producer request (%s)"
- .format(response, producerRequest))
- if (logger.isTraceEnabled) {
- val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
- successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
- trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
- }
- failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
- .map(partitionStatus => partitionStatus._1)
- if(failedTopicPartitions.size > 0)
- error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s"
- .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(",")))
- failedTopicPartitions
+ if(response != null) {
+ if (response.status.size != producerRequest.data.size)
+ throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
+ if (logger.isTraceEnabled) {
+ val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
+ successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
+ trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
+ }
+ failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
+ .map(partitionStatus => partitionStatus._1)
+ if(failedTopicPartitions.size > 0)
+ error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s"
+ .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(",")))
+ failedTopicPartitions
+ } else
+ Seq.empty[TopicAndPartition]
} catch {
case t: Throwable =>
warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6df077b..ece1b46 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -23,13 +23,13 @@ import kafka.message._
import kafka.network._
import org.apache.log4j.Logger
import scala.collection._
-import kafka.network.RequestChannel.Response
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic._
import kafka.metrics.KafkaMetricsGroup
import org.I0Itec.zkclient.ZkClient
import kafka.common._
import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
+import kafka.network.RequestChannel.Response
/**
@@ -127,8 +127,11 @@ class KafkaApis(val requestChannel: RequestChannel,
val allPartitionHaveReplicationFactorOne =
!produceRequest.data.keySet.exists(
m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1)
- if (produceRequest.requiredAcks == 0 ||
- produceRequest.requiredAcks == 1 ||
+ if(produceRequest.requiredAcks == 0) {
+ // send a fake producer response if producer request.required.acks = 0. This mimics the behavior of a 0.7 producer
+ // and is tuned for very high throughput
+ requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null))
+ } else if (produceRequest.requiredAcks == 1 ||
produceRequest.numPartitions <= 0 ||
allPartitionHaveReplicationFactorOne ||
numPartitionsInError == produceRequest.numPartitions) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 402fced..007e85d 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -21,11 +21,7 @@ import java.nio.ByteBuffer
import junit.framework.Assert._
import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
import kafka.server.{KafkaRequestHandler, KafkaConfig}
-import java.util.Properties
-import kafka.utils.Utils
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
-import kafka.serializer._
-import kafka.utils.TestUtils
import org.apache.log4j.{Level, Logger}
import org.I0Itec.zkclient.ZkClient
import kafka.zk.ZooKeeperTestHarness
@@ -33,6 +29,7 @@ import org.scalatest.junit.JUnit3Suite
import scala.collection._
import kafka.admin.{AdminUtils, CreateTopicCommand}
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
+import kafka.utils.{TestUtils, Utils}
/**
* End to end tests of the primitive apis against a local server
@@ -83,9 +80,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testDefaultEncoderProducerAndFetch() {
val topic = "test-topic"
- val props = new Properties()
- props.put("serializer.class", "kafka.serializer.StringEncoder")
- props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+ val props = producer.config.props.props
val config = new ProducerConfig(props)
val stringProducer1 = new Producer[String, String](config)
@@ -111,9 +106,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
def testDefaultEncoderProducerAndFetchWithCompression() {
val topic = "test-topic"
- val props = new Properties()
- props.put("serializer.class", classOf[StringEncoder].getName.toString)
- props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+ val props = producer.config.props.props
props.put("compression", "true")
val config = new ProducerConfig(props)
@@ -272,7 +265,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
}
producer.send(produceList: _*)
- // wait a bit for produced message to be available
val request = builder.build()
val response = consumer.fetch(request)
for( (topic, partition) <- topics) {
@@ -315,6 +307,40 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
}
+ def testPipelinedProduceRequests() {
+ createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
+ val props = producer.config.props.props
+ props.put("request.required.acks", "0")
+ val pipelinedProducer: Producer[String, String] = new Producer(new ProducerConfig(props))
+
+ // send some messages
+ val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
+ val messages = new mutable.HashMap[String, Seq[String]]
+ val builder = new FetchRequestBuilder()
+ var produceList: List[KeyedMessage[String, String]] = Nil
+ for( (topic, partition) <- topics) {
+ val messageList = List("a_" + topic, "b_" + topic)
+ val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
+ messages += topic -> messageList
+ pipelinedProducer.send(producerData:_*)
+ builder.addFetch(topic, partition, 0, 10000)
+ }
+
+ // wait until the messages are published
+ TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test1", 0).get.logEndOffset == 2 }, 1000)
+ TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test2", 0).get.logEndOffset == 2 }, 1000)
+ TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test3", 0).get.logEndOffset == 2 }, 1000)
+ TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test4", 0).get.logEndOffset == 2 }, 1000)
+
+ // test if the consumer received the messages in the correct order when producer has enabled request pipelining
+ val request = builder.build()
+ val response = consumer.fetch(request)
+ for( (topic, partition) <- topics) {
+ val fetched = response.messageSet(topic, partition)
+ assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
+ }
+ }
+
/**
* For testing purposes, just create these topics each with one partition and one replica for
* which the provided broker should the leader for. Create and wait for broker to lead. Simple.
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index 0fde254..731ee59 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -19,11 +19,8 @@ package kafka.integration
import kafka.consumer.SimpleConsumer
import org.scalatest.junit.JUnit3Suite
-import java.util.Properties
import kafka.producer.{ProducerConfig, Producer}
import kafka.utils.TestUtils
-import kafka.serializer._
-
trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
val port: Int
val host = "localhost"
@@ -32,16 +29,7 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes
override def setUp() {
super.setUp
- val props = new Properties()
- props.put("partitioner.class", "kafka.utils.StaticPartitioner")
- props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
- props.put("send.buffer.bytes", "65536")
- props.put("connect.timeout.ms", "100000")
- props.put("reconnect.interval", "10000")
- props.put("retry.backoff.ms", "1000")
- props.put("message.send.max.retries", "3")
- props.put("request.required.acks", "-1")
- props.put("serializer.class", classOf[StringEncoder].getName.toString)
+ val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner")
producer = new Producer(new ProducerConfig(props))
consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 051ebe3..3b2c069 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -24,7 +24,7 @@ import org.junit._
import org.scalatest.junit.JUnitSuite
import scala.collection._
import scala.util.Random
-import kafka.utils._
+import kafka.utils.TestUtils
class OffsetIndexTest extends JUnitSuite {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index c25255f..67497dd 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -36,7 +36,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
var logDirZk: File = null
var config: KafkaConfig = null
- var serverZk: KafkaServer = null
+ var server: KafkaServer = null
var simpleConsumerZk: SimpleConsumer = null
@@ -55,14 +55,14 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
val logDirZkPath = propsZk.getProperty("log.dir")
logDirZk = new File(logDirZkPath)
config = new KafkaConfig(propsZk)
- serverZk = TestUtils.createServer(config);
+ server = TestUtils.createServer(config);
simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "")
}
@After
override def tearDown() {
simpleConsumerZk.close
- serverZk.shutdown
+ server.shutdown
Utils.rm(logDirZk)
super.tearDown()
}
@@ -164,6 +164,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put("log4j.appender.KAFKA.Topic", "test-topic")
props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
+ props.put("log4j.appender.KAFKA.requiredNumAcks", "1")
props
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 7395cbc..9322b2c 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -21,7 +21,6 @@ import java.net._
import java.io._
import org.junit._
import org.scalatest.junit.JUnitSuite
-import kafka.utils.TestUtils
import java.util.Random
import junit.framework.Assert._
import kafka.producer.SyncProducerConfig
@@ -29,13 +28,14 @@ import kafka.api.ProducerRequest
import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
import kafka.message.ByteBufferMessageSet
+import java.nio.channels.SelectionKey
class SocketServerTest extends JUnitSuite {
val server: SocketServer = new SocketServer(0,
host = null,
- port = TestUtils.choosePort,
+ port = kafka.utils.TestUtils.choosePort,
numProcessorThreads = 1,
maxQueuedRequests = 50,
maxRequestSize = 50)
@@ -102,4 +102,36 @@ class SocketServerTest extends JUnitSuite {
receiveResponse(socket)
}
+ @Test
+ def testPipelinedRequestOrdering() {
+ val socket = connect()
+ val correlationId = -1
+ val clientId = SyncProducerConfig.DefaultClientId
+ val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+ val ack: Short = 0
+ val emptyRequest =
+ new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+
+ val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
+ emptyRequest.writeTo(byteBuffer)
+ byteBuffer.rewind()
+ val serializedBytes = new Array[Byte](byteBuffer.remaining)
+ byteBuffer.get(serializedBytes)
+
+ sendRequest(socket, 0, serializedBytes)
+ sendRequest(socket, 0, serializedBytes)
+
+ // here the socket server should've read only the first request completely and since the response is not sent yet
+ // the selection key should not be readable
+ val request = server.requestChannel.receiveRequest
+ Assert.assertFalse((request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ)
+
+ server.requestChannel.sendResponse(new RequestChannel.Response(0, request, null))
+
+ // if everything is working correctly, until you send a response for the first request,
+ // the 2nd request will not be read by the socket server
+ val request2 = server.requestChannel.receiveRequest
+ server.requestChannel.sendResponse(new RequestChannel.Response(0, request2, null))
+ Assert.assertFalse((request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ)
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index fb0666f..922a200 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -356,11 +356,9 @@ class AsyncProducerTest extends JUnit3Suite {
@Test
def testBrokerListAndAsync() {
return
- val props = new Properties()
- props.put("serializer.class", "kafka.serializer.StringEncoder")
+ val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
props.put("producer.type", "async")
props.put("batch.num.messages", "5")
- props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
@@ -394,9 +392,10 @@ class AsyncProducerTest extends JUnit3Suite {
@Test
def testFailedSendRetryLogic() {
val props = new Properties()
+ props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+ props.put("request.required.acks", "1")
props.put("serializer.class", classOf[StringEncoder].getName.toString)
props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)
- props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
val config = new ProducerConfig(props)
@@ -410,12 +409,12 @@ class AsyncProducerTest extends JUnit3Suite {
// produce request for topic1 and partitions 0 and 1. Let the first request fail
// entirely. The second request will succeed for partition 1 but fail for partition 0.
// On the third try for partition 0, let it succeed.
- val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 11)
- val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 17)
+ val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 11)
+ val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 17)
val response1 = ProducerResponse(0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)),
(TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
- val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 21)
+ val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21)
val response2 = ProducerResponse(0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 792919b..04acef5 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -199,7 +199,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("partitioner.class", "kafka.utils.StaticPartitioner")
props.put("request.timeout.ms", "2000")
-// props.put("request.required.acks", "-1")
+ props.put("request.required.acks", "1")
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
// create topic
@@ -258,6 +258,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
props.put("partitioner.class", "kafka.utils.StaticPartitioner")
props.put("request.timeout.ms", String.valueOf(timeoutMs))
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+ props.put("request.required.acks", "1")
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 89ba944..81b2736 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -18,7 +18,6 @@
package kafka.producer
import java.net.SocketTimeoutException
-import java.util.Properties
import junit.framework.Assert
import kafka.admin.CreateTopicCommand
import kafka.integration.KafkaServerTestHarness
@@ -38,16 +37,13 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testReachableServer() {
val server = servers.head
- val props = new Properties()
- props.put("host", "localhost")
- props.put("port", server.socketServer.port.toString)
- props.put("send.buffer.bytes", "102400")
- props.put("connect.timeout.ms", "500")
- props.put("reconnect.interval", "1000")
+ val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+
val producer = new SyncProducer(new SyncProducerConfig(props))
val firstStart = SystemTime.milliseconds
try {
- val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))))
+ val response = producer.send(TestUtils.produceRequest("test", 0,
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
Assert.assertNotNull(response)
} catch {
case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage)
@@ -56,7 +52,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
Assert.assertTrue((firstEnd-firstStart) < 500)
val secondStart = SystemTime.milliseconds
try {
- val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))))
+ val response = producer.send(TestUtils.produceRequest("test", 0,
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
Assert.assertNotNull(response)
} catch {
case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage)
@@ -64,7 +61,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val secondEnd = SystemTime.milliseconds
Assert.assertTrue((secondEnd-secondStart) < 500)
try {
- val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))))
+ val response = producer.send(TestUtils.produceRequest("test", 0,
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
Assert.assertNotNull(response)
} catch {
case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage)
@@ -74,36 +72,31 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testEmptyProduceRequest() {
val server = servers.head
- val props = new Properties()
- props.put("host", "localhost")
- props.put("port", server.socketServer.port.toString)
- props.put("send.buffer.bytes", "102400")
- props.put("connect.timeout.ms", "300")
- props.put("reconnect.interval", "500")
+ val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+
val correlationId = 0
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
- val ack = SyncProducerConfig.DefaultRequiredAcks
+ val ack: Short = 1
val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
-
val producer = new SyncProducer(new SyncProducerConfig(props))
val response = producer.send(emptyRequest)
+ Assert.assertTrue(response != null)
Assert.assertTrue(!response.hasError && response.status.size == 0)
}
@Test
def testMessageSizeTooLarge() {
val server = servers.head
- val props = new Properties()
- props.put("host", "localhost")
- props.put("port", server.socketServer.port.toString)
+ val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+
val producer = new SyncProducer(new SyncProducerConfig(props))
CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))
val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
- val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1))
+ val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1, acks = 1))
Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error)
@@ -112,7 +105,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val safeSize = configs(0).messageMaxBytes - Message.MessageOverhead - MessageSet.LogOverhead - 1
val message2 = new Message(new Array[Byte](safeSize))
val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
- val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2))
+ val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2, acks = 1))
Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("test", 0)).error)
@@ -122,12 +115,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@Test
def testProduceCorrectlyReceivesResponse() {
val server = servers.head
- val props = new Properties()
- props.put("host", "localhost")
- props.put("port", server.socketServer.port.toString)
- props.put("send.buffer.bytes", "102400")
- props.put("connect.timeout.ms", "300")
- props.put("reconnect.interval", "500")
+ val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
val producer = new SyncProducer(new SyncProducerConfig(props))
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
@@ -173,15 +161,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val timeoutMs = 500
val server = servers.head
- val props = new Properties()
- props.put("host", "localhost")
- props.put("port", server.socketServer.port.toString)
- props.put("send.buffer.bytes", "102400")
- props.put("request.timeout.ms", String.valueOf(timeoutMs))
+ val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
val producer = new SyncProducer(new SyncProducerConfig(props))
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
- val request = TestUtils.produceRequest("topic1", 0, messages)
+ val request = TestUtils.produceRequest("topic1", 0, messages, acks = 1)
// stop IO threads and request handling, but leave networking operational
// any requests should be accepted and queue up, but not handled
@@ -196,8 +180,21 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
case e => Assert.fail("Unexpected exception when expecting timeout: " + e)
}
val t2 = SystemTime.milliseconds
-
// make sure we don't wait fewer than timeoutMs for a response
Assert.assertTrue((t2-t1) >= timeoutMs)
}
+
+ @Test
+ def testProduceRequestWithNoResponse() {
+ val server = servers.head
+ val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+ val correlationId = 0
+ val clientId = SyncProducerConfig.DefaultClientId
+ val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+ val ack: Short = 0
+ val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]())
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ val response = producer.send(emptyRequest)
+ Assert.assertTrue(response == null)
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index cd724a3..db46247 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -48,7 +48,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0))
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
- val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
+ val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
producerProps.put("request.required.acks", "-1")
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 7afbe54..3728f8c 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -44,7 +44,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
def testCleanShutdown() {
var server = new KafkaServer(config)
server.startup()
- val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)
+ val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)))
producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString)
var producer = new Producer[Int, String](new ProducerConfig(producerConfig))
http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 9400328..217ff7a 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -301,16 +301,25 @@ object TestUtils extends Logging {
new Producer[K, V](new ProducerConfig(props))
}
- def getProducerConfig(brokerList: String, bufferSize: Int, connectTimeout: Int,
- reconnectInterval: Int): Properties = {
+ def getProducerConfig(brokerList: String, partitioner: String = "kafka.producer.DefaultPartitioner"): Properties = {
val props = new Properties()
- props.put("producer.type", "sync")
props.put("broker.list", brokerList)
- props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
- props.put("send.buffer.bytes", bufferSize.toString)
- props.put("connect.timeout.ms", connectTimeout.toString)
- props.put("reconnect.interval", reconnectInterval.toString)
- props.put("request.timeout.ms", 30000.toString)
+ props.put("partitioner.class", partitioner)
+ props.put("message.send.max.retries", "3")
+ props.put("retry.backoff.ms", "1000")
+ props.put("request.timeout.ms", "500")
+ props.put("request.required.acks", "-1")
+ props.put("serializer.class", classOf[StringEncoder].getName.toString)
+
+ props
+ }
+
+ def getSyncProducerConfig(port: Int): Properties = {
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", port.toString)
+ props.put("request.timeout.ms", "500")
+ props.put("request.required.acks", "1")
props.put("serializer.class", classOf[StringEncoder].getName.toString)
props
}