You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/03/26 19:12:31 UTC
svn commit: r1305454 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/consumer/ main/scala/kafka/network/
main/scala/kafka/producer/ main/scala/kafka/producer/async/
main/scala/kafka/utils/ test/scala/unit/kafka/producer/
Author: junrao
Date: Mon Mar 26 17:12:30 2012
New Revision: 1305454
URL: http://svn.apache.org/viewvc?rev=1305454&view=rev
Log:
SyncProducer does not correctly timeout; patched by Prashanth Menon; reviewed by Jun Rao, Neha Narkhede; KAFKA-305
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Mon Mar 26 17:12:30 2012
@@ -17,91 +17,89 @@
package kafka.consumer
-import java.net._
-import java.nio.channels._
import kafka.api._
import kafka.network._
import kafka.utils._
-import kafka.utils.Utils._
/**
* A consumer of kafka messages
*/
@threadsafe
-class SimpleConsumer(val host: String,
- val port: Int,
- val soTimeout: Int,
- val bufferSize: Int) extends Logging {
- private var channel : SocketChannel = null
+class SimpleConsumer( val host: String,
+ val port: Int,
+ val soTimeout: Int,
+ val bufferSize: Int ) extends Logging {
+
private val lock = new Object()
+ private val blockingChannel = new BlockingChannel(host, port, bufferSize, 0, soTimeout)
- private def connect(): SocketChannel = {
+ private def connect(): BlockingChannel = {
close
- val address = new InetSocketAddress(host, port)
+ blockingChannel.connect()
+ blockingChannel
+ }
+
+ private def disconnect() = {
+ if(blockingChannel.isConnected) {
+ debug("Disconnecting from " + host + ":" + port)
+ blockingChannel.disconnect()
+ }
+ }
- val channel = SocketChannel.open
- debug("Connected to " + address + " for fetching.")
- channel.configureBlocking(true)
- channel.socket.setReceiveBufferSize(bufferSize)
- channel.socket.setSoTimeout(soTimeout)
- channel.socket.setKeepAlive(true)
- channel.connect(address)
- trace("requested receive buffer size=" + bufferSize + " actual receive buffer size= " + channel.socket.getReceiveBufferSize)
- trace("soTimeout=" + soTimeout + " actual soTimeout= " + channel.socket.getSoTimeout)
-
- channel
- }
-
- private def close(channel: SocketChannel) = {
- debug("Disconnecting from " + channel.socket.getRemoteSocketAddress())
- swallow(channel.close())
- swallow(channel.socket.close())
+ private def reconnect() {
+ disconnect()
+ connect()
}
def close() {
lock synchronized {
- if (channel != null)
- close(channel)
- channel = null
+ disconnect()
}
}
-
- /**
- * Fetch a set of messages from a topic.
- *
- * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
- * @return a set of fetched messages
- */
- def fetch(request: FetchRequest): FetchResponse = {
+
+ private def sendRequest(request: Request): Tuple2[Receive, Int] = {
lock synchronized {
- val startTime = SystemTime.nanoseconds
getOrMakeConnection()
var response: Tuple2[Receive,Int] = null
try {
- sendRequest(request, channel)
- response = getResponse(channel)
+ blockingChannel.send(request)
+ response = blockingChannel.receive()
} catch {
case e : java.io.IOException =>
- info("Reconnect in fetch request due to socket error: ", e)
+ info("Reconnect in due to socket error: ", e)
// retry once
try {
- channel = connect
- sendRequest(request, channel)
- response = getResponse(channel)
+ reconnect()
+ blockingChannel.send(request)
+ response = blockingChannel.receive()
} catch {
- case ioe: java.io.IOException => channel = null; throw ioe;
+ case ioe: java.io.IOException =>
+ disconnect()
+ throw ioe
}
case e => throw e
}
- val fetchResponse = FetchResponse.readFrom(response._1.buffer)
- val fetchedSize = fetchResponse.sizeInBytes
+ response
+ }
+ }
- val endTime = SystemTime.nanoseconds
- SimpleConsumerStats.recordFetchRequest(endTime - startTime)
- SimpleConsumerStats.recordConsumptionThroughput(fetchedSize)
+ /**
+ * Fetch a set of messages from a topic.
+ *
+ * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
+ * @return a set of fetched messages
+ */
+ def fetch(request: FetchRequest): FetchResponse = {
+ val startTime = SystemTime.nanoseconds
+ val response = sendRequest(request)
+ val fetchResponse = FetchResponse.readFrom(response._1.buffer)
+ val fetchedSize = fetchResponse.sizeInBytes
+
+ val endTime = SystemTime.nanoseconds
+ SimpleConsumerStats.recordFetchRequest(endTime - startTime)
+ SimpleConsumerStats.recordConsumptionThroughput(fetchedSize)
- fetchResponse
- }
+ fetchResponse
}
/**
@@ -112,31 +110,14 @@ class SimpleConsumer(val host: String,
* @return an array of offsets
*/
def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = {
- lock synchronized {
- getOrMakeConnection()
- var response: Tuple2[Receive,Int] = null
- try {
- sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets), channel)
- response = getResponse(channel)
- } catch {
- case e : java.io.IOException =>
- info("Reconnect in get offetset request due to socket error: ", e)
- // retry once
- try {
- channel = connect
- sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets), channel)
- response = getResponse(channel)
- } catch {
- case ioe: java.io.IOException => channel = null; throw ioe;
- }
- }
- OffsetRequest.deserializeOffsetArray(response._1.buffer)
- }
+ val request = new OffsetRequest(topic, partition, time, maxNumOffsets)
+ val response = sendRequest(request)
+ OffsetRequest.deserializeOffsetArray(response._1.buffer)
}
private def getOrMakeConnection() {
- if(channel == null) {
- channel = connect()
+ if(!blockingChannel.isConnected) {
+ connect()
}
}
}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala?rev=1305454&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala Mon Mar 26 17:12:30 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.net.InetSocketAddress
+import java.nio.channels._
+import kafka.utils.{nonthreadsafe, Logging}
+
+/**
+ * A simple blocking channel with timeouts correctly enabled.
+ *
+ */
+@nonthreadsafe
+class BlockingChannel( val host: String,
+ val port: Int,
+ val readBufferSize: Int,
+ val writeBufferSize: Int,
+ val readTimeoutMs: Int ) extends Logging {
+
+ private var connected = false
+ private var channel: SocketChannel = null
+ private var readChannel: ReadableByteChannel = null
+ private var writeChannel: GatheringByteChannel = null
+ private val lock = new Object()
+
+ def connect() = lock synchronized {
+ if(!connected) {
+ channel = SocketChannel.open()
+ if(readBufferSize > 0)
+ channel.socket.setReceiveBufferSize(readBufferSize)
+ if(writeBufferSize > 0)
+ channel.socket.setSendBufferSize(writeBufferSize)
+ channel.configureBlocking(true)
+ channel.socket.setSoTimeout(readTimeoutMs)
+ channel.socket.setKeepAlive(true)
+ channel.connect(new InetSocketAddress(host, port))
+
+ writeChannel = channel
+ readChannel = Channels.newChannel(channel.socket().getInputStream)
+ connected = true
+ }
+ }
+
+ def disconnect() = lock synchronized {
+ if(connected || channel != null) {
+ // closing the main socket channel *should* close the read channel
+ // but let's do it to be sure.
+ swallow(channel.close())
+ swallow(channel.socket.close())
+ swallow(readChannel.close())
+ channel = null; readChannel = null; writeChannel = null
+ connected = false
+ }
+ }
+
+ def isConnected = connected
+
+ def send(request: Request):Int = {
+ if(!connected)
+ throw new ClosedChannelException()
+
+ val send = new BoundedByteBufferSend(request)
+ send.writeCompletely(writeChannel)
+ }
+
+ def receive(): Tuple2[Receive, Int] = {
+ if(!connected)
+ throw new ClosedChannelException()
+
+ val response = new BoundedByteBufferReceive()
+ response.readCompletely(readChannel)
+
+ // this has the side effect of setting the initial position of buffer correctly
+ val errorCode: Int = response.buffer.getShort
+ (response, errorCode)
+ }
+
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala Mon Mar 26 17:12:30 2012
@@ -50,12 +50,13 @@ trait Receive extends Transmission {
def readFrom(channel: ReadableByteChannel): Int
def readCompletely(channel: ReadableByteChannel): Int = {
- var read = 0
+ var totalRead = 0
while(!complete) {
- read = readFrom(channel)
+ val read = readFrom(channel)
trace(read + " bytes read.")
+ totalRead += read
}
- read
+ totalRead
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Mon Mar 26 17:12:30 2012
@@ -17,14 +17,11 @@
package kafka.producer
-import java.net.InetSocketAddress
-import java.nio.channels.SocketChannel
import kafka.api._
import kafka.common.MessageSizeTooLargeException
import kafka.message.MessageSet
-import kafka.network.{BoundedByteBufferSend, Request, Receive}
+import kafka.network.{BlockingChannel, BoundedByteBufferSend, Request, Receive}
import kafka.utils._
-import kafka.utils.Utils._
/*
* Send a message set.
@@ -33,11 +30,10 @@ import kafka.utils.Utils._
class SyncProducer(val config: SyncProducerConfig) extends Logging {
private val MaxConnectBackoffMs = 60000
- private var channel : SocketChannel = null
private var sentOnConnection = 0
private val lock = new Object()
- @volatile
- private var shutdown: Boolean = false
+ @volatile private var shutdown: Boolean = false
+ private val blockingChannel = new BlockingChannel(config.host, config.port, 0, config.bufferSize, config.socketTimeoutMs)
debug("Instantiating Scala Sync Producer")
@@ -64,21 +60,19 @@ class SyncProducer(val config: SyncProdu
var response: Tuple2[Receive, Int] = null
try {
- sendRequest(request, channel)
- response = getResponse(channel)
+ blockingChannel.send(request)
+ response = blockingChannel.receive()
} catch {
case e: java.io.IOException =>
// no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
disconnect()
- println("sdfsdfsdf")
throw e
- case e => println("other sdfsdfsdfs"); throw e
+ case e => throw e
}
// TODO: do we still need this?
sentOnConnection += 1
if(sentOnConnection >= config.reconnectInterval) {
- disconnect()
- channel = connect()
+ reconnect()
sentOnConnection = 0
}
SyncProducerStats.recordProduceRequest(SystemTime.nanoseconds - startTime)
@@ -119,41 +113,38 @@ class SyncProducer(val config: SyncProdu
throw new MessageSizeTooLargeException
}
+ private def reconnect() {
+ disconnect()
+ connect()
+ }
+
/**
* Disconnect from current channel, closing connection.
* Side effect: channel field is set to null on successful disconnect
*/
private def disconnect() {
try {
- if(channel != null) {
+ if(blockingChannel.isConnected) {
info("Disconnecting from " + config.host + ":" + config.port)
- swallow(channel.close())
- swallow(channel.socket.close())
- channel = null
+ blockingChannel.disconnect()
}
} catch {
case e: Exception => error("Error on disconnect: ", e)
}
}
- private def connect(): SocketChannel = {
+ private def connect(): BlockingChannel = {
var connectBackoffMs = 1
val beginTimeMs = SystemTime.milliseconds
- while(channel == null && !shutdown) {
+ while(!blockingChannel.isConnected && !shutdown) {
try {
- channel = SocketChannel.open()
- channel.socket.setSendBufferSize(config.bufferSize)
- channel.configureBlocking(true)
- channel.socket.setSoTimeout(config.socketTimeoutMs)
- channel.socket.setKeepAlive(true)
- channel.connect(new InetSocketAddress(config.host, config.port))
+ blockingChannel.connect()
info("Connected to " + config.host + ":" + config.port + " for producing")
- }
- catch {
+ } catch {
case e: Exception => {
disconnect()
val endTimeMs = SystemTime.milliseconds
- if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs) {
+ if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs ) {
error("Producer connection to " + config.host + ":" + config.port + " timing out after " + config.connectTimeoutMs + " ms", e)
throw e
}
@@ -163,12 +154,12 @@ class SyncProducer(val config: SyncProdu
}
}
}
- channel
+ blockingChannel
}
private def getOrMakeConnection() {
- if(channel == null) {
- channel = connect()
+ if(!blockingChannel.isConnected) {
+ connect()
}
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Mon Mar 26 17:12:30 2012
@@ -24,8 +24,8 @@ import kafka.producer._
import kafka.serializer.Encoder
import scala.collection.Map
import scala.collection.mutable.{ListBuffer, HashMap}
-import kafka.common.{NoLeaderForPartitionException, InvalidPartitionException, NoBrokersForPartitionException}
import kafka.utils.{Utils, Logging}
+import kafka.common.{FailedToSendMessageException, NoLeaderForPartitionException, InvalidPartitionException, NoBrokersForPartitionException}
class DefaultEventHandler[K,V](config: ProducerConfig, // this api is for testing
private val partitioner: Partitioner[K], // use the other constructor
@@ -42,17 +42,21 @@ class DefaultEventHandler[K,V](config: P
def handle(events: Seq[ProducerData[K,V]]) {
lock synchronized {
- val serializedData = serialize(events)
+ val serializedData = serialize(events)
var outstandingProduceRequests = serializedData
var remainingRetries = config.producerRetries
- Stream.continually(dispatchSerializedData(outstandingProduceRequests))
- .takeWhile(requests => (remainingRetries > 0) && (requests.size > 0)).foreach {
- currentOutstandingRequests =>
- outstandingProduceRequests = currentOutstandingRequests
+ while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
+ outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
+ if (outstandingProduceRequests.size > 0) {
// back off and update the topic metadata cache before attempting another send operation
Thread.sleep(config.producerRetryBackoffMs)
- brokerPartitionInfo.updateInfo()
+ Utils.swallowError(brokerPartitionInfo.updateInfo())
remainingRetries -= 1
+ }
+ }
+ if(outstandingProduceRequests.size > 0) {
+ error("Failed to send the following reqeusts: " + outstandingProduceRequests)
+ throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
}
}
}
@@ -70,7 +74,7 @@ class DefaultEventHandler[K,V](config: P
if((brokerid < 0) || (!send(brokerid, messageSetPerBroker)))
failedProduceRequests.appendAll(eventsPerBrokerMap.map(r => r._2).flatten)
}
- }catch {
+ } catch {
case t: Throwable => error("Failed to send messages")
}
failedProduceRequests
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Mon Mar 26 17:12:30 2012
@@ -701,20 +701,6 @@ object Utils extends Logging {
case _ => // swallow
}
}
-
- def sendRequest(request: Request, channel: SocketChannel) = {
- val send = new BoundedByteBufferSend(request)
- send.writeCompletely(channel)
- }
-
- def getResponse(channel: SocketChannel): Tuple2[Receive,Int] = {
- val response = new BoundedByteBufferReceive()
- response.readCompletely(channel)
-
- // this has the side effect of setting the initial position of buffer correctly
- val errorCode: Int = response.buffer.getShort
- (response, errorCode)
- }
}
class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Mon Mar 26 17:12:30 2012
@@ -18,18 +18,19 @@
package kafka.producer
import org.scalatest.junit.JUnit3Suite
-import kafka.zk.ZooKeeperTestHarness
-import kafka.consumer.SimpleConsumer
-import org.I0Itec.zkclient.ZkClient
-import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
import java.util.Properties
-import org.apache.log4j.{Level, Logger}
-import org.junit.Test
-import kafka.utils.{TestZKUtils, Utils, TestUtils}
-import kafka.message.Message
import kafka.admin.CreateTopicCommand
import kafka.api.FetchRequestBuilder
+import kafka.common.FailedToSendMessageException
+import kafka.consumer.SimpleConsumer
+import kafka.message.Message
+import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.log4j.{Level, Logger}
+import org.I0Itec.zkclient.ZkClient
import org.junit.Assert._
+import org.junit.Test
+import kafka.utils.{SystemTime, TestZKUtils, Utils, TestUtils}
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
private val brokerId1 = 0
@@ -77,7 +78,9 @@ class ProducerTest extends JUnit3Suite w
// restore set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.ERROR)
server1.shutdown
+ server1.awaitShutdown()
server2.shutdown
+ server2.awaitShutdown()
Utils.rm(server1.config.logDir)
Utils.rm(server2.config.logDir)
Thread.sleep(500)
@@ -120,107 +123,63 @@ class ProducerTest extends JUnit3Suite w
}
} catch {
case e: Exception => fail("Not expected", e)
+ } finally {
+ producer.close
}
- producer.close
}
-// @Test
-// def testZKSendWithDeadBroker() {
-// val props = new Properties()
-// props.put("serializer.class", "kafka.serializer.StringEncoder")
-// props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-// props.put("zk.connect", TestZKUtils.zookeeperConnect)
-//
-// // create topic
-// CreateTopicCommand.createTopic(zkClient, "new-topic", 2, 1, "0,0")
-//
-// val config = new ProducerConfig(props)
-//
-// val producer = new Producer[String, String](config)
-// val message = new Message("test1".getBytes)
-// try {
-//// // kill 2nd broker
-//// server1.shutdown
-//// Thread.sleep(100)
-//
-// // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
-// // all partitions have broker 0 as the leader.
-// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-// Thread.sleep(100)
-//
-// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-// Thread.sleep(3000)
-//
-// // restart server 1
-//// server1.startup()
-//// Thread.sleep(100)
-//
-// // cross check if brokers got the messages
-// val response = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-// val messageSet = response.messageSet("new-topic", 0).iterator
-// var numMessagesReceived = 0
-// while(messageSet.hasNext) {
-// val messageAndOffset = messageSet.next()
-// assertEquals(message, messageSet.next.message)
-// println("Received message at offset %d".format(messageAndOffset.offset))
-// numMessagesReceived += 1
-// }
-// assertEquals("Message set should have 2 messages", 2, numMessagesReceived)
-// } catch {
-// case e: Exception => fail("Not expected", e)
-// }
-// producer.close
-// }
-
- // TODO: Need to rewrite when SyncProducer changes to throw timeout exceptions
- // and when leader logic is changed.
-// @Test
-// def testZKSendWithDeadBroker2() {
-// val props = new Properties()
-// props.put("serializer.class", "kafka.serializer.StringEncoder")
-// props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-// props.put("socket.timeout.ms", "200")
-// props.put("zk.connect", TestZKUtils.zookeeperConnect)
-//
-// // create topic
-// CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
-//
-// val config = new ProducerConfig(props)
-//
-// val producer = new Producer[String, String](config)
-// try {
-// // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
-// // all partitions have broker 0 as the leader.
-// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-// Thread.sleep(100)
-// // kill 2nd broker
-// server1.shutdown
-// Thread.sleep(500)
-//
-// // Since all partitions are unavailable, this request will be dropped
-// try {
-// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-// fail("Leader broker for \"new-topic\" isn't up, should not be able to send data")
-// } catch {
-// case e: kafka.common.FailedToSendMessageException => // success
-// case e => fail("Leader broker for \"new-topic\" isn't up, should not be able to send data")
-// }
-//
-// // restart server 1
-// server1.startup()
-// Thread.sleep(200)
-//
-// // cross check if brokers got the messages
-// val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-// val messageSet1 = response1.messageSet("new-topic", 0).iterator
-// assertTrue("Message set should have 1 message", messageSet1.hasNext)
-// assertEquals(new Message("test1".getBytes), messageSet1.next.message)
-// assertFalse("Message set should not have more than 1 message", messageSet1.hasNext)
-// } catch {
-// case e: Exception => fail("Not expected", e)
-// }
-// producer.close
-// }
+ @Test
+ def testZKSendWithDeadBroker() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("partitioner.class", "kafka.utils.StaticPartitioner")
+ props.put("socket.timeout.ms", "2000")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+ // create topic
+ CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
+
+ val config = new ProducerConfig(props)
+ val producer = new Producer[String, String](config)
+ try {
+ // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
+ // on broker 0
+ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+ Thread.sleep(100)
+ } catch {
+ case e => fail("Unexpected exception: " + e)
+ }
+
+ // kill the broker
+ server1.shutdown
+ server1.awaitShutdown()
+ Thread.sleep(100)
+
+ try {
+ // These sends should fail since there are no available brokers
+ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+ Thread.sleep(100)
+ fail("Should fail since no leader exists for the partition.")
+ } catch {
+ case e => // success
+ }
+
+ // restart server 1
+ server1.startup()
+ Thread.sleep(500)
+
+ try {
+ // cross check if broker 1 got the messages
+ val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+ val messageSet1 = response1.messageSet("new-topic", 0).iterator
+ assertTrue("Message set should have 1 message", messageSet1.hasNext)
+ assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+ assertFalse("Message set should have another message", messageSet1.hasNext)
+ } catch {
+ case e: Exception => fail("Not expected", e)
+ }
+ producer.close
+ }
@Test
def testZKSendToExistingTopicWithNoBrokers() {
@@ -250,6 +209,7 @@ class ProducerTest extends JUnit3Suite w
// shutdown server2
server2.shutdown
+ server2.awaitShutdown()
Thread.sleep(100)
// delete the new-topic logs
Utils.rm(server2.config.logDir)
@@ -279,5 +239,56 @@ class ProducerTest extends JUnit3Suite w
producer.close
}
}
+
+ @Test
+ def testAsyncSendCanCorrectlyFailWithTimeout() {
+ val timeoutMs = 500
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("partitioner.class", "kafka.utils.StaticPartitioner")
+ props.put("socket.timeout.ms", String.valueOf(timeoutMs))
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+ val config = new ProducerConfig(props)
+ val producer = new Producer[String, String](config)
+
+ // create topics in ZK
+ CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
+
+ // do a simple test to make sure plumbing is okay
+ try {
+ // this message should be assigned to partition 0 whose leader is on broker 0
+ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
+ Thread.sleep(100)
+ // cross check if brokers got the messages
+ val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+ val messageSet1 = response1.messageSet("new-topic", 0).iterator
+ assertTrue("Message set should have 1 message", messageSet1.hasNext)
+ assertEquals(new Message("test".getBytes), messageSet1.next.message)
+ } catch {
+ case e => case e: Exception => producer.close; fail("Not expected", e)
+ }
+
+ // stop IO threads and request handling, but leave networking operational
+ // any requests should be accepted and queue up, but not handled
+ server1.requestHandlerPool.shutdown()
+
+ val t1 = SystemTime.milliseconds
+ try {
+ // this message should be assigned to partition 0 whose leader is on broker 0, but
+ // broker 0 will not response within timeoutMs millis.
+ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
+ } catch {
+ case e: FailedToSendMessageException => /* success */
+ case e: Exception => fail("Not expected", e)
+ } finally {
+ producer.close
+ }
+ val t2 = SystemTime.milliseconds
+
+ // make sure we don't wait fewer than numRetries*timeoutMs milliseconds
+ // we do this because the DefaultEventHandler retries a number of times
+ assertTrue((t2-t1) >= timeoutMs*config.producerRetries)
+ }
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Mon Mar 26 17:12:30 2012
@@ -27,6 +27,7 @@ import kafka.server.KafkaConfig
import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
+import java.net.SocketTimeoutException
class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
private var messageBytes = new Array[Byte](2);
@@ -90,7 +91,7 @@ class SyncProducerTest extends JUnit3Sui
}
@Test
- def testProduceBlocksWhenRequired() {
+ def testProduceCorrectlyReceivesResponse() {
// TODO: this will need to change with kafka-44
val server = servers.head
val props = new Properties()
@@ -134,4 +135,37 @@ class SyncProducerTest extends JUnit3Sui
Assert.assertEquals(ErrorMapping.WrongPartitionCode.toShort, response2.errors(1))
Assert.assertEquals(-1, response2.offsets(1))
}
+
+ @Test
+ def testProducerCanTimeout() {
+ val timeoutMs = 500
+
+ val server = servers.head
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", server.socketServer.port.toString)
+ props.put("buffer.size", "102400")
+ props.put("socket.timeout.ms", String.valueOf(timeoutMs))
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+
+ val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
+ val request = TestUtils.produceRequest("topic1", 0, messages)
+
+ // stop IO threads and request handling, but leave networking operational
+ // any requests should be accepted and queue up, but not handled
+ server.requestHandlerPool.shutdown()
+
+ val t1 = SystemTime.milliseconds
+ try {
+ val response2 = producer.send(request)
+ Assert.fail("Should have received timeout exception since request handling is stopped.")
+ } catch {
+ case e: SocketTimeoutException => /* success */
+ case e => Assert.fail("Unexpected exception when expecting timeout: " + e)
+ }
+ val t2 = SystemTime.milliseconds
+
+ // make sure we don't wait fewer than timeoutMs for a response
+ Assert.assertTrue((t2-t1) >= timeoutMs)
+ }
}