You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:18 UTC
[33/36] git commit: KAFKA-955 After a leader change, messages sent
with ack=0 are lost; reviewed by Jay Kreps, Neha Narkhede and Jun Rao
KAFKA-955 After a leader change, messages sent with ack=0 are lost; reviewed by Jay Kreps, Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f89ddced
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f89ddced
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f89ddced
Branch: refs/heads/trunk
Commit: f89ddced1ba058f0c51697957cde8bb2e2b05c4d
Parents: ea54700
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Wed Aug 28 10:16:50 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Aug 28 10:16:59 2013 -0700
----------------------------------------------------------------------
.../scala/kafka/network/RequestChannel.scala | 24 +++++++++++++-
.../main/scala/kafka/network/SocketServer.scala | 34 ++++++++++++--------
.../src/main/scala/kafka/server/KafkaApis.scala | 17 +++++++---
.../unit/kafka/producer/ProducerTest.scala | 4 ++-
.../unit/kafka/producer/SyncProducerTest.scala | 27 ++++++++++++++++
5 files changed, 86 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f89ddced/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1437496..77d7ec0 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -84,12 +84,20 @@ object RequestChannel extends Logging {
}
}
- case class Response(processor: Int, request: Request, responseSend: Send) {
+ case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
request.responseCompleteTimeMs = SystemTime.milliseconds
+ def this(processor: Int, request: Request, responseSend: Send) =
+ this(processor, request, responseSend, if (responseSend == null) NoOpAction else SendAction)
+
def this(request: Request, send: Send) =
this(request.processor, request, send)
}
+
+ trait ResponseAction
+ case object SendAction extends ResponseAction
+ case object NoOpAction extends ResponseAction
+ case object CloseConnectionAction extends ResponseAction
}
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
@@ -127,6 +135,20 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
onResponse(response.processor)
}
+ /** No operation to take for the request, need to read more over the network */
+ def noOperation(processor: Int, request: RequestChannel.Request) {
+ responseQueues(processor).put(new RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction))
+ for(onResponse <- responseListeners)
+ onResponse(processor)
+ }
+
+ /** Close the connection for the request */
+ def closeConnection(processor: Int, request: RequestChannel.Request) {
+ responseQueues(processor).put(new RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction))
+ for(onResponse <- responseListeners)
+ onResponse(processor)
+ }
+
/** Get the next request or block until there is one */
def receiveRequest(): RequestChannel.Request =
requestQueue.take()
http://git-wip-us.apache.org/repos/asf/kafka/blob/f89ddced/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d5bd143..216245d 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -274,25 +274,33 @@ private[kafka] class Processor(val id: Int,
while(curr != null) {
val key = curr.request.requestKey.asInstanceOf[SelectionKey]
try {
- if(curr.responseSend == null) {
- // a null response send object indicates that there is no response to send to the client.
- // In this case, we just want to turn the interest ops to READ to be able to read more pipelined requests
- // that are sitting in the server's socket buffer
- trace("Socket server received empty response to send, registering for read: " + curr)
- key.interestOps(SelectionKey.OP_READ)
- key.attach(null)
- curr.request.updateRequestMetrics
- } else {
- trace("Socket server received response to send, registering for write: " + curr)
- key.interestOps(SelectionKey.OP_WRITE)
- key.attach(curr)
+ curr.responseAction match {
+ case RequestChannel.NoOpAction => {
+ // There is no response to send to the client, we need to read more pipelined requests
+ // that are sitting in the server's socket buffer
+ curr.request.updateRequestMetrics
+ trace("Socket server received empty response to send, registering for read: " + curr)
+ key.interestOps(SelectionKey.OP_READ)
+ key.attach(null)
+ }
+ case RequestChannel.SendAction => {
+ trace("Socket server received response to send, registering for write: " + curr)
+ key.interestOps(SelectionKey.OP_WRITE)
+ key.attach(curr)
+ }
+ case RequestChannel.CloseConnectionAction => {
+ curr.request.updateRequestMetrics
+ trace("Closing socket connection actively according to the response code.")
+ close(key)
+ }
+ case responseCode => throw new KafkaException("No mapping found for response code " + responseCode)
}
} catch {
case e: CancelledKeyException => {
debug("Ignoring response for closed socket.")
close(key)
}
- }finally {
+ } finally {
curr = requestChannel.receiveResponse(id)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f89ddced/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index b17964e..cd02aab 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -51,8 +51,7 @@ class KafkaApis(val requestChannel: RequestChannel,
* and is queried by the topic metadata request. */
var leaderCache: mutable.Map[TopicAndPartition, PartitionStateInfo] =
new mutable.HashMap[TopicAndPartition, PartitionStateInfo]()
-// private var allBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
- private var aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
+ private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
private val partitionMetadataLock = new Object
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
@@ -170,9 +169,17 @@ class KafkaApis(val requestChannel: RequestChannel,
!produceRequest.data.keySet.exists(
m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1)
if(produceRequest.requiredAcks == 0) {
- // send a fake producer response if producer request.required.acks = 0. This mimics the behavior of a 0.7 producer
- // and is tuned for very high throughput
- requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null))
+ // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since
+ // no response is expected by the producer the handler will send a close connection response to the socket server
+ // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata
+ if (numPartitionsInError != 0) {
+ info(("Send the close connection response due to error handling produce request " +
+ "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
+ .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
+ requestChannel.closeConnection(request.processor, request)
+ } else {
+ requestChannel.noOperation(request.processor, request)
+ }
} else if (produceRequest.requiredAcks == 1 ||
produceRequest.numPartitions <= 0 ||
allPartitionHaveReplicationFactorOne ||
http://git-wip-us.apache.org/repos/asf/kafka/blob/f89ddced/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index b511d90..29331db 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -17,6 +17,7 @@
package kafka.producer
+import org.scalatest.TestFailedException
import org.scalatest.junit.JUnit3Suite
import kafka.consumer.SimpleConsumer
import kafka.message.Message
@@ -236,7 +237,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
fail("Should fail since no leader exists for the partition.")
} catch {
- case e => // success
+ case e : TestFailedException => throw e // catch and re-throw the failure message
+ case e2 => // otherwise success
}
// restart server 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/f89ddced/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index b5ee31d..b3e89c3 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -114,6 +114,33 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
}
@Test
+ def testMessageSizeTooLargeWithAckZero() {
+ val server = servers.head
+
+ val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+ props.put("request.required.acks", "0")
+
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
+
+ // This message will be dropped silently since message size too large.
+ producer.send(TestUtils.produceRequest("test", 0,
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
+
+ // Send another message whose size is large enough to exceed the buffer size so
+ // the socket buffer will be flushed immediately;
+ // this send should fail since the socket has been closed
+ try {
+ producer.send(TestUtils.produceRequest("test", 0,
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0))
+ } catch {
+ case e : java.io.IOException => // success
+ case e2 => throw e2
+ }
+ }
+
+ @Test
def testProduceCorrectlyReceivesResponse() {
val server = servers.head
val props = TestUtils.getSyncProducerConfig(server.socketServer.port)