You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/07/27 02:16:08 UTC
svn commit: r1366244 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/api/ main/scala/kafka/server/
test/scala/unit/kafka/integration/ test/scala/unit/kafka/producer/
Author: junrao
Date: Fri Jul 27 00:16:08 2012
New Revision: 1366244
URL: http://svn.apache.org/viewvc?rev=1366244&view=rev
Log:
deal with empty TopicData list in producer and fetch request; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-412
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1366244&r1=1366243&r2=1366244&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Fri Jul 27 00:16:08 2012
@@ -142,6 +142,8 @@ case class FetchRequest(versionId: Short
}
def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
+
+ def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1366244&r1=1366243&r2=1366244&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Fri Jul 27 00:16:08 2012
@@ -117,7 +117,9 @@ class KafkaApis(val requestChannel: Requ
val response = produceToLocalLog(produceRequest)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
- if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1) {
+ if (produceRequest.requiredAcks == 0 ||
+ produceRequest.requiredAcks == 1 ||
+ produceRequest.data.size <= 0) {
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
for (topicData <- produceRequest.data)
@@ -230,7 +232,9 @@ class KafkaApis(val requestChannel: Requ
// if there are enough bytes available right now we can answer the request, otherwise we have to punt
val availableBytes = availableFetchBytes(fetchRequest)
- if(fetchRequest.maxWait <= 0 || availableBytes >= fetchRequest.minBytes) {
+ if(fetchRequest.maxWait <= 0 ||
+ availableBytes >= fetchRequest.minBytes ||
+ fetchRequest.numPartitions <= 0) {
val topicData = readMessageSets(fetchRequest)
debug("Returning fetch response %s for fetch request with correlation id %d"
.format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1366244&r1=1366243&r2=1366244&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Fri Jul 27 00:16:08 2012
@@ -32,7 +32,7 @@ import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection._
import kafka.admin.CreateTopicCommand
-import kafka.common.{InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
+import kafka.common.{ErrorMapping, InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
/**
* End to end tests of the primitive apis against a local server
@@ -93,6 +93,13 @@ class PrimitiveApiTest extends JUnit3Sui
}
}
+ def testEmptyFetchRequest() {
+ val offsets = Array[OffsetDetail]()
+ val request = new FetchRequest(offsetInfo = offsets)
+ val fetched = consumer.fetch(request)
+ assertTrue(fetched.errorCode == ErrorMapping.NoError && fetched.data.size == 0)
+ }
+
def testDefaultEncoderProducerAndFetch() {
val topic = "test-topic"
val props = new Properties()
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1366244&r1=1366243&r2=1366244&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Fri Jul 27 00:16:08 2012
@@ -28,6 +28,7 @@ import kafka.server.KafkaConfig
import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
+import kafka.api.TopicData
class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
private var messageBytes = new Array[Byte](2);
@@ -71,6 +72,27 @@ class SyncProducerTest extends JUnit3Sui
}
@Test
+ def testEmptyProduceRequest() {
+ val server = servers.head
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", server.socketServer.port.toString)
+ props.put("buffer.size", "102400")
+ props.put("connect.timeout.ms", "300")
+ props.put("reconnect.interval", "500")
+ props.put("max.message.size", "100")
+ val correlationId = SyncProducerConfig.DefaultCorrelationId
+ val clientId = SyncProducerConfig.DefaultClientId
+ val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+ val ack = SyncProducerConfig.DefaultRequiredAcks
+ val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]())
+
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ val response = producer.send(emptyRequest)
+ Assert.assertTrue(response.errorCode == ErrorMapping.NoError && response.errors.size == 0 && response.offsets.size == 0)
+ }
+
+ @Test
def testSingleMessageSizeTooLarge() {
val server = servers.head
val props = new Properties()