You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/07/19 21:51:13 UTC
svn commit: r1363507 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/consumer/ main/scala/kafka/javaapi/consumer/
main/scala/kafka/producer/ main/scala/kafka/server/
test/scala/unit/kafka/integration/ test/scala/unit/kafka/log/ test/scala...
Author: nehanarkhede
Date: Thu Jul 19 19:51:12 2012
New Revision: 1363507
URL: http://svn.apache.org/viewvc?rev=1363507&view=rev
Log:
KAFKA-352 Unknown topic error code handling for all requests; patched by Neha Narkhede; reviewed by Jay Kreps
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Thu Jul 19 19:51:12 2012
@@ -20,6 +20,7 @@ package kafka.consumer
import kafka.api._
import kafka.network._
import kafka.utils._
+import kafka.common.ErrorMapping
/**
* A consumer of kafka messages
@@ -111,8 +112,10 @@ class SimpleConsumer( val host: String,
*/
def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = {
val request = new OffsetRequest(topic, partition, time, maxNumOffsets)
- val response = sendRequest(request)
- OffsetResponse.readFrom(response.buffer).offsets
+ val offsetResponse = OffsetResponse.readFrom(sendRequest(request).buffer)
+ // try to throw exception based on global error codes
+ ErrorMapping.maybeThrowException(offsetResponse.errorCode)
+ offsetResponse.offsets
}
private def getOrMakeConnection() {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java Thu Jul 19 19:51:12 2012
@@ -18,13 +18,14 @@
package kafka.javaapi.consumer;
-import java.util.List;
-import java.util.Map;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.message.Message;
import kafka.serializer.Decoder;
+import java.util.List;
+import java.util.Map;
+
public interface ConsumerConnector {
/**
* Create a list of MessageStreams of type T for each topic.
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Thu Jul 19 19:51:12 2012
@@ -20,7 +20,7 @@ package kafka.producer
import kafka.cluster.Broker
import java.util.Properties
import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZkUtils, Utils, Logging}
+import kafka.utils.{ZkUtils, Logging}
import collection.mutable.HashMap
import java.lang.Object
import kafka.common.{UnavailableProducerException, NoBrokersForPartitionException}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Jul 19 19:51:12 2012
@@ -168,8 +168,6 @@ class KafkaApis(val requestChannel: Requ
BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes)
try {
- // TODO: should use replicaManager for ensurePartitionLeaderOnThisBroker?
- // although this ties in with KAFKA-352
kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
@@ -357,8 +355,20 @@ class KafkaApis(val requestChannel: Requ
val offsetRequest = OffsetRequest.readFrom(request.request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Offset request " + offsetRequest.toString)
- val offsets = logManager.getOffsets(offsetRequest)
- val response = new OffsetResponse(offsetRequest.versionId, offsets)
+ var response: OffsetResponse = null
+ try {
+ kafkaZookeeper.ensurePartitionLeaderOnThisBroker(offsetRequest.topic, offsetRequest.partition)
+ val offsets = logManager.getOffsets(offsetRequest)
+ response = new OffsetResponse(offsetRequest.versionId, offsets)
+ }catch {
+ case ioe: IOException =>
+ fatal("Halting due to unrecoverable I/O error while handling producer request: " + ioe.getMessage, ioe)
+ System.exit(1)
+ case e =>
+ warn("Error while responding to offset request", e)
+ response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],
+ ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort)
+ }
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
@@ -367,7 +377,6 @@ class KafkaApis(val requestChannel: Requ
*/
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer)
-
if(requestLogger.isTraceEnabled)
requestLogger.trace("Topic metadata request " + metadataRequest.toString())
@@ -395,7 +404,6 @@ class KafkaApis(val requestChannel: Requ
}
}
}
- info("Sending response for topic metadata request")
val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Thu Jul 19 19:51:12 2012
@@ -25,7 +25,7 @@ import org.I0Itec.zkclient.{IZkDataListe
import kafka.admin.AdminUtils
import java.lang.{Thread, IllegalStateException}
import collection.mutable.HashSet
-import kafka.common.{InvalidPartitionException, NoLeaderForPartitionException, NotLeaderForPartitionException, KafkaZookeeperClient}
+import kafka.common._
/**
* Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -41,8 +41,8 @@ class KafkaZooKeeper(config: KafkaConfig
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
private var zkClient: ZkClient = null
- private val leaderChangeListener = new LeaderChangeListener
- private val topicPartitionsChangeListener = new TopicChangeListener
+ private var leaderChangeListener: LeaderChangeListener = null
+ private var topicPartitionsChangeListener: TopicChangeListener = null
private var stateChangeHandler: StateChangeCommandHandler = null
private val topicListenerLock = new Object
@@ -52,6 +52,8 @@ class KafkaZooKeeper(config: KafkaConfig
/* start client */
info("connecting to ZK: " + config.zkConnect)
zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+ leaderChangeListener = new LeaderChangeListener
+ topicPartitionsChangeListener = new TopicChangeListener
startStateChangeCommandHandler()
zkClient.subscribeStateChanges(new SessionExpireListener)
registerBrokerInZk()
@@ -112,9 +114,8 @@ class KafkaZooKeeper(config: KafkaConfig
}
def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
- // TODO: KAFKA-352 first check if this topic exists in the cluster
-// if(!topicPartitionsChangeListener.doesTopicExistInCluster(topic))
-// throw new UnknownTopicException("Topic %s doesn't exist in the cluster".format(topic))
+ if(!topicPartitionsChangeListener.doesTopicExistInCluster(topic))
+ throw new UnknownTopicException("Topic %s doesn't exist in the cluster".format(topic))
// check if partition id is invalid
if(partition < 0)
throw new InvalidPartitionException("Partition %d is invalid".format(partition))
@@ -287,6 +288,8 @@ class KafkaZooKeeper(config: KafkaConfig
class TopicChangeListener extends IZkChildListener with Logging {
private val allTopics = new HashSet[String]()
+ // read existing topics, if any
+ allTopics ++= ZkUtils.getAllTopics(zkClient)
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
@@ -297,17 +300,20 @@ class KafkaZooKeeper(config: KafkaConfig
val newTopics = currentChildren -- allTopics
val deletedTopics = allTopics -- currentChildren
allTopics.clear()
- allTopics ++ currentChildren
+ allTopics ++= currentChildren
debug("New topics: [%s]. Deleted topics: [%s]".format(newTopics.mkString(","), deletedTopics.mkString(",")))
+ debug("Current topics in the cluster: [%s]".format(allTopics.mkString(",")))
handleNewTopics(newTopics.toSeq)
// TODO: Handle topic deletions
- //handleDeletedTopics(deletedTopics.toSeq)
+ // handleDeletedTopics(deletedTopics.toSeq)
}
}
def doesTopicExistInCluster(topic: String): Boolean = {
- allTopics.contains(topic)
+ topicListenerLock.synchronized {
+ allTopics.contains(topic)
+ }
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Thu Jul 19 19:51:12 2012
@@ -32,7 +32,7 @@ import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection._
import kafka.admin.CreateTopicCommand
-import kafka.common.{InvalidPartitionException, NotLeaderForPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
+import kafka.common.{InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
/**
* End to end tests of the primitive apis against a local server
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Thu Jul 19 19:51:12 2012
@@ -30,6 +30,7 @@ import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import kafka.admin.CreateTopicCommand
import kafka.api.{FetchRequestBuilder, OffsetRequest}
+import kafka.common.UnknownTopicException
object LogOffsetTest {
val random = new Random()
@@ -63,6 +64,16 @@ class LogOffsetTest extends JUnit3Suite
}
@Test
+ def testGetOffsetsForUnknownTopic() {
+ try {
+ simpleConsumer.getOffsetsBefore("foo", 0, OffsetRequest.LatestTime, 10)
+ fail("Should fail with UnknownTopicException since topic foo was never created")
+ }catch {
+ case e: UnknownTopicException => // this is ok
+ }
+ }
+
+ @Test
def testGetOffsetsBeforeLatestTime() {
val topicPartition = "kafka-" + 0
val topic = topicPartition.split("-").head
@@ -104,14 +115,14 @@ class LogOffsetTest extends JUnit3Suite
topicLogDir.mkdir
val topic = topicPartition.split("-").head
- val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+ TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
var offsetChanged = false
for(i <- 1 to 14) {
- val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
+ val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, 0,
OffsetRequest.EarliestTime, 1)
if(consumerOffsets(0) == 1) {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Thu Jul 19 19:51:12 2012
@@ -132,7 +132,7 @@ class SyncProducerTest extends JUnit3Sui
Assert.assertEquals(request.correlationId, response.correlationId)
Assert.assertEquals(response.errors.length, response.offsets.length)
Assert.assertEquals(3, response.errors.length)
- response.errors.foreach(Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, _))
+ response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, _))
response.offsets.foreach(Assert.assertEquals(-1L, _))
// #2 - test that we get correct offsets when partition is owned by broker
@@ -141,7 +141,6 @@ class SyncProducerTest extends JUnit3Sui
CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
TestUtils.waitUntilLeaderIsElected(zkClient, "topic3", 0, 500)
- Thread.sleep(500)
val response2 = producer.send(request)
Assert.assertNotNull(response2)
Assert.assertEquals(request.correlationId, response2.correlationId)
@@ -154,8 +153,8 @@ class SyncProducerTest extends JUnit3Sui
Assert.assertEquals(messages.sizeInBytes, response2.offsets(0))
Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
- // the middle message should have been rejected because broker doesn't lead partition
- Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, response2.errors(1))
+ // the middle message should have been rejected because the topic does not exist
+ Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, response2.errors(1))
Assert.assertEquals(-1, response2.offsets(1))
}
@@ -180,7 +179,7 @@ class SyncProducerTest extends JUnit3Sui
val t1 = SystemTime.milliseconds
try {
- val response2 = producer.send(request)
+ producer.send(request)
Assert.fail("Should have received timeout exception since request handling is stopped.")
} catch {
case e: SocketTimeoutException => /* success */
@@ -191,4 +190,28 @@ class SyncProducerTest extends JUnit3Sui
// make sure we don't wait fewer than timeoutMs for a response
Assert.assertTrue((t2-t1) >= timeoutMs)
}
+
+ @Test
+ def testProduceRequestForUnknownTopic() {
+ val server = servers.head
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", server.socketServer.port.toString)
+ props.put("buffer.size", "102400")
+ props.put("connect.timeout.ms", "300")
+ props.put("reconnect.interval", "500")
+ props.put("max.message.size", "100")
+
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
+
+ val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1)
+ val response = producer.send(request)
+
+ Assert.assertNotNull(response)
+ Assert.assertEquals(request.correlationId, response.correlationId)
+ Assert.assertEquals(response.errors.length, response.offsets.length)
+ Assert.assertEquals(3, response.errors.length)
+ response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, _))
+ }
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1363507&r1=1363506&r2=1363507&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Thu Jul 19 19:51:12 2012
@@ -20,7 +20,6 @@ import java.io.File
import kafka.consumer.SimpleConsumer
import java.util.Properties
import org.junit.Test
-import org.scalatest.junit.JUnitSuite
import junit.framework.Assert._
import kafka.message.{Message, ByteBufferMessageSet}
import org.scalatest.junit.JUnit3Suite