You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/07/27 02:16:08 UTC

svn commit: r1366244 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/server/ test/scala/unit/kafka/integration/ test/scala/unit/kafka/producer/

Author: junrao
Date: Fri Jul 27 00:16:08 2012
New Revision: 1366244

URL: http://svn.apache.org/viewvc?rev=1366244&view=rev
Log:
deal with empty TopicData list in producer and fetch request; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-412

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1366244&r1=1366243&r2=1366244&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Fri Jul 27 00:16:08 2012
@@ -142,6 +142,8 @@ case class FetchRequest(versionId: Short
   }
 
   def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
+
+  def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size)
 }
 
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1366244&r1=1366243&r2=1366244&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Fri Jul 27 00:16:08 2012
@@ -117,7 +117,9 @@ class KafkaApis(val requestChannel: Requ
     val response = produceToLocalLog(produceRequest)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
 
-    if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1) {
+    if (produceRequest.requiredAcks == 0 ||
+        produceRequest.requiredAcks == 1 ||
+        produceRequest.data.size <= 0) {
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
 
       for (topicData <- produceRequest.data)
@@ -230,7 +232,9 @@ class KafkaApis(val requestChannel: Requ
 
     // if there are enough bytes available right now we can answer the request, otherwise we have to punt
     val availableBytes = availableFetchBytes(fetchRequest)
-    if(fetchRequest.maxWait <= 0 || availableBytes >= fetchRequest.minBytes) {
+    if(fetchRequest.maxWait <= 0 ||
+       availableBytes >= fetchRequest.minBytes ||
+       fetchRequest.numPartitions <= 0) {
       val topicData = readMessageSets(fetchRequest)
       debug("Returning fetch response %s for fetch request with correlation id %d"
         .format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1366244&r1=1366243&r2=1366244&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Fri Jul 27 00:16:08 2012
@@ -32,7 +32,7 @@ import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
 import kafka.admin.CreateTopicCommand
-import kafka.common.{InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
+import kafka.common.{ErrorMapping, InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -93,6 +93,13 @@ class PrimitiveApiTest extends JUnit3Sui
     }
   }
 
+  def testEmptyFetchRequest() {
+    val offsets = Array[OffsetDetail]()
+    val request = new FetchRequest(offsetInfo = offsets)
+    val fetched = consumer.fetch(request)
+    assertTrue(fetched.errorCode == ErrorMapping.NoError && fetched.data.size == 0)
+  }
+
   def testDefaultEncoderProducerAndFetch() {
     val topic = "test-topic"
     val props = new Properties()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1366244&r1=1366243&r2=1366244&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Fri Jul 27 00:16:08 2012
@@ -28,6 +28,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
+import kafka.api.TopicData
 
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
@@ -71,6 +72,27 @@ class SyncProducerTest extends JUnit3Sui
   }
 
   @Test
+  def testEmptyProduceRequest() {
+    val server = servers.head
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", server.socketServer.port.toString)
+    props.put("buffer.size", "102400")
+    props.put("connect.timeout.ms", "300")
+    props.put("reconnect.interval", "500")
+    props.put("max.message.size", "100")
+    val correlationId = SyncProducerConfig.DefaultCorrelationId
+    val clientId = SyncProducerConfig.DefaultClientId
+    val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+    val ack = SyncProducerConfig.DefaultRequiredAcks
+    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]())
+
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    val response = producer.send(emptyRequest)
+    Assert.assertTrue(response.errorCode == ErrorMapping.NoError && response.errors.size == 0 && response.offsets.size == 0)
+  }
+
+  @Test
   def testSingleMessageSizeTooLarge() {
     val server = servers.head
     val props = new Properties()