You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/07/19 21:51:13 UTC

svn commit: r1363507 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/consumer/ main/scala/kafka/javaapi/consumer/ main/scala/kafka/producer/ main/scala/kafka/server/ test/scala/unit/kafka/integration/ test/scala/unit/kafka/log/ test/scala...

Author: nehanarkhede
Date: Thu Jul 19 19:51:12 2012
New Revision: 1363507

URL: http://svn.apache.org/viewvc?rev=1363507&view=rev
Log:
KAFKA-352 Unknown topic error code handling for all requests; patched by Neha Narkhede; reviewed by Jay Kreps

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Thu Jul 19 19:51:12 2012
@@ -20,6 +20,7 @@ package kafka.consumer
 import kafka.api._
 import kafka.network._
 import kafka.utils._
+import kafka.common.ErrorMapping
 
 /**
  * A consumer of kafka messages
@@ -111,8 +112,10 @@ class SimpleConsumer( val host: String,
    */
   def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = {
     val request = new OffsetRequest(topic, partition, time, maxNumOffsets)
-    val response = sendRequest(request)
-    OffsetResponse.readFrom(response.buffer).offsets
+    val offsetResponse = OffsetResponse.readFrom(sendRequest(request).buffer)
+    // try to throw exception based on global error codes
+    ErrorMapping.maybeThrowException(offsetResponse.errorCode)
+    offsetResponse.offsets
   }
 
   private def getOrMakeConnection() {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java Thu Jul 19 19:51:12 2012
@@ -18,13 +18,14 @@
 package kafka.javaapi.consumer;
 
 
-import java.util.List;
-import java.util.Map;
 import kafka.consumer.KafkaStream;
 import kafka.consumer.TopicFilter;
 import kafka.message.Message;
 import kafka.serializer.Decoder;
 
+import java.util.List;
+import java.util.Map;
+
 public interface ConsumerConnector {
   /**
    *  Create a list of MessageStreams of type T for each topic.

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Thu Jul 19 19:51:12 2012
@@ -20,7 +20,7 @@ package kafka.producer
 import kafka.cluster.Broker
 import java.util.Properties
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZkUtils, Utils, Logging}
+import kafka.utils.{ZkUtils, Logging}
 import collection.mutable.HashMap
 import java.lang.Object
 import kafka.common.{UnavailableProducerException, NoBrokersForPartitionException}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Jul 19 19:51:12 2012
@@ -168,8 +168,6 @@ class KafkaApis(val requestChannel: Requ
         BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes)
         BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes)
         try {
-          // TODO: should use replicaManager for ensurePartitionLeaderOnThisBroker?
-          // although this ties in with KAFKA-352
           kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
           val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
           log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
@@ -357,8 +355,20 @@ class KafkaApis(val requestChannel: Requ
     val offsetRequest = OffsetRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Offset request " + offsetRequest.toString)
-    val offsets = logManager.getOffsets(offsetRequest)
-    val response = new OffsetResponse(offsetRequest.versionId, offsets)
+    var response: OffsetResponse = null
+    try {
+      kafkaZookeeper.ensurePartitionLeaderOnThisBroker(offsetRequest.topic, offsetRequest.partition)
+      val offsets = logManager.getOffsets(offsetRequest)
+      response = new OffsetResponse(offsetRequest.versionId, offsets)
+    }catch {
+      case ioe: IOException =>
+        fatal("Halting due to unrecoverable I/O error while handling producer request: " + ioe.getMessage, ioe)
+        System.exit(1)
+      case e =>
+        warn("Error while responding to offset request", e)
+        response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],
+          ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort)
+    }
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
@@ -367,7 +377,6 @@ class KafkaApis(val requestChannel: Requ
    */
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer)
-
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Topic metadata request " + metadataRequest.toString())
 
@@ -395,7 +404,6 @@ class KafkaApis(val requestChannel: Requ
           }
       }
     }
-    info("Sending response for topic metadata request")
     val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Thu Jul 19 19:51:12 2012
@@ -25,7 +25,7 @@ import org.I0Itec.zkclient.{IZkDataListe
 import kafka.admin.AdminUtils
 import java.lang.{Thread, IllegalStateException}
 import collection.mutable.HashSet
-import kafka.common.{InvalidPartitionException, NoLeaderForPartitionException, NotLeaderForPartitionException, KafkaZookeeperClient}
+import kafka.common._
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -41,8 +41,8 @@ class KafkaZooKeeper(config: KafkaConfig
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
   private var zkClient: ZkClient = null
-  private val leaderChangeListener = new LeaderChangeListener
-  private val topicPartitionsChangeListener = new TopicChangeListener
+  private var leaderChangeListener: LeaderChangeListener = null
+  private var topicPartitionsChangeListener: TopicChangeListener = null
   private var stateChangeHandler: StateChangeCommandHandler = null
 
   private val topicListenerLock = new Object
@@ -52,6 +52,8 @@ class KafkaZooKeeper(config: KafkaConfig
     /* start client */
     info("connecting to ZK: " + config.zkConnect)
     zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+    leaderChangeListener = new LeaderChangeListener
+    topicPartitionsChangeListener = new TopicChangeListener
     startStateChangeCommandHandler()
     zkClient.subscribeStateChanges(new SessionExpireListener)
     registerBrokerInZk()
@@ -112,9 +114,8 @@ class KafkaZooKeeper(config: KafkaConfig
   }
 
   def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
-    // TODO: KAFKA-352 first check if this topic exists in the cluster
-//    if(!topicPartitionsChangeListener.doesTopicExistInCluster(topic))
-//      throw new UnknownTopicException("Topic %s doesn't exist in the cluster".format(topic))
+    if(!topicPartitionsChangeListener.doesTopicExistInCluster(topic))
+      throw new UnknownTopicException("Topic %s doesn't exist in the cluster".format(topic))
     // check if partition id is invalid
     if(partition < 0)
       throw new InvalidPartitionException("Partition %d is invalid".format(partition))
@@ -287,6 +288,8 @@ class KafkaZooKeeper(config: KafkaConfig
 
   class TopicChangeListener extends IZkChildListener with Logging {
     private val allTopics = new HashSet[String]()
+    // read existing topics, if any
+    allTopics ++= ZkUtils.getAllTopics(zkClient)
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
@@ -297,17 +300,20 @@ class KafkaZooKeeper(config: KafkaConfig
         val newTopics = currentChildren -- allTopics
         val deletedTopics = allTopics -- currentChildren
         allTopics.clear()
-        allTopics ++ currentChildren
+        allTopics ++= currentChildren
 
         debug("New topics: [%s]. Deleted topics: [%s]".format(newTopics.mkString(","), deletedTopics.mkString(",")))
+        debug("Current topics in the cluster: [%s]".format(allTopics.mkString(",")))
         handleNewTopics(newTopics.toSeq)
         // TODO: Handle topic deletions
-        //handleDeletedTopics(deletedTopics.toSeq)
+        // handleDeletedTopics(deletedTopics.toSeq)
       }
     }
 
     def doesTopicExistInCluster(topic: String): Boolean = {
-      allTopics.contains(topic)
+      topicListenerLock.synchronized {
+        allTopics.contains(topic)
+      }
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Thu Jul 19 19:51:12 2012
@@ -32,7 +32,7 @@ import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
 import kafka.admin.CreateTopicCommand
-import kafka.common.{InvalidPartitionException, NotLeaderForPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
+import kafka.common.{InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
 
 /**
  * End to end tests of the primitive apis against a local server

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Thu Jul 19 19:51:12 2012
@@ -30,6 +30,7 @@ import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import kafka.admin.CreateTopicCommand
 import kafka.api.{FetchRequestBuilder, OffsetRequest}
+import kafka.common.UnknownTopicException
 
 object LogOffsetTest {
   val random = new Random()  
@@ -63,6 +64,16 @@ class LogOffsetTest extends JUnit3Suite 
   }
 
   @Test
+  def testGetOffsetsForUnknownTopic() {
+    try {
+      simpleConsumer.getOffsetsBefore("foo", 0, OffsetRequest.LatestTime, 10)
+      fail("Should fail with UnknownTopicException since topic foo was never created")
+    }catch {
+      case e: UnknownTopicException => // this is ok
+    }
+  }
+
+  @Test
   def testGetOffsetsBeforeLatestTime() {
     val topicPartition = "kafka-" + 0
     val topic = topicPartition.split("-").head
@@ -104,14 +115,14 @@ class LogOffsetTest extends JUnit3Suite 
     topicLogDir.mkdir
 
     val topic = topicPartition.split("-").head
-    val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
     CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+    TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
 
     var offsetChanged = false
     for(i <- 1 to 14) {
-      val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
+      val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, 0,
         OffsetRequest.EarliestTime, 1)
 
       if(consumerOffsets(0) == 1) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Thu Jul 19 19:51:12 2012
@@ -132,7 +132,7 @@ class SyncProducerTest extends JUnit3Sui
     Assert.assertEquals(request.correlationId, response.correlationId)
     Assert.assertEquals(response.errors.length, response.offsets.length)
     Assert.assertEquals(3, response.errors.length)
-    response.errors.foreach(Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, _))
+    response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, _))
     response.offsets.foreach(Assert.assertEquals(-1L, _))
 
     // #2 - test that we get correct offsets when partition is owned by broker
@@ -141,7 +141,6 @@ class SyncProducerTest extends JUnit3Sui
     CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
     TestUtils.waitUntilLeaderIsElected(zkClient, "topic3", 0, 500)
 
-    Thread.sleep(500)
     val response2 = producer.send(request)
     Assert.assertNotNull(response2)
     Assert.assertEquals(request.correlationId, response2.correlationId)
@@ -154,8 +153,8 @@ class SyncProducerTest extends JUnit3Sui
     Assert.assertEquals(messages.sizeInBytes, response2.offsets(0))
     Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
 
-    // the middle message should have been rejected because broker doesn't lead partition
-    Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, response2.errors(1))
+    // the middle message should have been rejected because the topic does not exist
+    Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, response2.errors(1))
     Assert.assertEquals(-1, response2.offsets(1))
   }
 
@@ -180,7 +179,7 @@ class SyncProducerTest extends JUnit3Sui
 
     val t1 = SystemTime.milliseconds
     try {
-      val response2 = producer.send(request)
+      producer.send(request)
       Assert.fail("Should have received timeout exception since request handling is stopped.")
     } catch {
       case e: SocketTimeoutException => /* success */
@@ -191,4 +190,28 @@ class SyncProducerTest extends JUnit3Sui
     // make sure we don't wait fewer than timeoutMs for a response
     Assert.assertTrue((t2-t1) >= timeoutMs)
   }
+
+  @Test
+  def testProduceRequestForUnknownTopic() {
+    val server = servers.head
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", server.socketServer.port.toString)
+    props.put("buffer.size", "102400")
+    props.put("connect.timeout.ms", "300")
+    props.put("reconnect.interval", "500")
+    props.put("max.message.size", "100")
+
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
+
+    val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1)
+    val response = producer.send(request)
+
+    Assert.assertNotNull(response)
+    Assert.assertEquals(request.correlationId, response.correlationId)
+    Assert.assertEquals(response.errors.length, response.offsets.length)
+    Assert.assertEquals(3, response.errors.length)
+    response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, _))
+  }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Thu Jul 19 19:51:12 2012
@@ -20,7 +20,6 @@ import java.io.File
 import kafka.consumer.SimpleConsumer
 import java.util.Properties
 import org.junit.Test
-import org.scalatest.junit.JUnitSuite
 import junit.framework.Assert._
 import kafka.message.{Message, ByteBufferMessageSet}
 import org.scalatest.junit.JUnit3Suite