You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/03/26 19:12:31 UTC

svn commit: r1305454 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/consumer/ main/scala/kafka/network/ main/scala/kafka/producer/ main/scala/kafka/producer/async/ main/scala/kafka/utils/ test/scala/unit/kafka/producer/

Author: junrao
Date: Mon Mar 26 17:12:30 2012
New Revision: 1305454

URL: http://svn.apache.org/viewvc?rev=1305454&view=rev
Log:
SyncProducer does not correctly timeout; patched by Prashanth Menon; reviewed by Jun Rao, Neha Narkhede; KAFKA-305

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Mon Mar 26 17:12:30 2012
@@ -17,91 +17,89 @@
 
 package kafka.consumer
 
-import java.net._
-import java.nio.channels._
 import kafka.api._
 import kafka.network._
 import kafka.utils._
-import kafka.utils.Utils._
 
 /**
  * A consumer of kafka messages
  */
 @threadsafe
-class SimpleConsumer(val host: String,
-                     val port: Int,
-                     val soTimeout: Int,
-                     val bufferSize: Int) extends Logging {
-  private var channel : SocketChannel = null
+class SimpleConsumer( val host: String,
+                      val port: Int,
+                      val soTimeout: Int,
+                      val bufferSize: Int ) extends Logging {
+
   private val lock = new Object()
+  private val blockingChannel = new BlockingChannel(host, port, bufferSize, 0, soTimeout)
 
-  private def connect(): SocketChannel = {
+  private def connect(): BlockingChannel = {
     close
-    val address = new InetSocketAddress(host, port)
+    blockingChannel.connect()
+    blockingChannel
+  }
+
+  private def disconnect() = {
+    if(blockingChannel.isConnected) {
+      debug("Disconnecting from " + host + ":" + port)
+      blockingChannel.disconnect()
+    }
+  }
 
-    val channel = SocketChannel.open
-    debug("Connected to " + address + " for fetching.")
-    channel.configureBlocking(true)
-    channel.socket.setReceiveBufferSize(bufferSize)
-    channel.socket.setSoTimeout(soTimeout)
-    channel.socket.setKeepAlive(true)
-    channel.connect(address)
-    trace("requested receive buffer size=" + bufferSize + " actual receive buffer size= " + channel.socket.getReceiveBufferSize)
-    trace("soTimeout=" + soTimeout + " actual soTimeout= " + channel.socket.getSoTimeout)
-    
-    channel
-  }
-
-  private def close(channel: SocketChannel) = {
-    debug("Disconnecting from " + channel.socket.getRemoteSocketAddress())
-    swallow(channel.close())
-    swallow(channel.socket.close())
+  private def reconnect() {
+    disconnect()
+    connect()
   }
 
   def close() {
     lock synchronized {
-      if (channel != null)
-        close(channel)
-      channel = null
+        disconnect()
     }
   }
-
-  /**
-   *  Fetch a set of messages from a topic.
-   *
-   *  @param request  specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
-   *  @return a set of fetched messages
-   */
-  def fetch(request: FetchRequest): FetchResponse = {
+  
+  private def sendRequest(request: Request): Tuple2[Receive, Int] = {
     lock synchronized {
-      val startTime = SystemTime.nanoseconds
       getOrMakeConnection()
       var response: Tuple2[Receive,Int] = null
       try {
-        sendRequest(request, channel)
-        response = getResponse(channel)
+        blockingChannel.send(request)
+        response = blockingChannel.receive()
       } catch {
         case e : java.io.IOException =>
-          info("Reconnect in fetch request due to socket error: ", e)
+          info("Reconnect in due to socket error: ", e)
           // retry once
           try {
-            channel = connect
-            sendRequest(request, channel)
-            response = getResponse(channel)
+            reconnect()
+            blockingChannel.send(request)
+            response = blockingChannel.receive()
           } catch {
-            case ioe: java.io.IOException => channel = null; throw ioe;
+            case ioe: java.io.IOException =>
+              disconnect()
+              throw ioe
           }
         case e => throw e
       }
-      val fetchResponse = FetchResponse.readFrom(response._1.buffer)
-      val fetchedSize = fetchResponse.sizeInBytes
+      response
+    }
+  }
 
-      val endTime = SystemTime.nanoseconds
-      SimpleConsumerStats.recordFetchRequest(endTime - startTime)
-      SimpleConsumerStats.recordConsumptionThroughput(fetchedSize)
+  /**
+   *  Fetch a set of messages from a topic.
+   *
+   *  @param request  specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
+   *  @return a set of fetched messages
+   */
+  def fetch(request: FetchRequest): FetchResponse = {
+    val startTime = SystemTime.nanoseconds
+    val response = sendRequest(request)
+    val fetchResponse = FetchResponse.readFrom(response._1.buffer)
+    val fetchedSize = fetchResponse.sizeInBytes
+
+    val endTime = SystemTime.nanoseconds
+    SimpleConsumerStats.recordFetchRequest(endTime - startTime)
+    SimpleConsumerStats.recordConsumptionThroughput(fetchedSize)
 
-      fetchResponse
-    }
+    fetchResponse
   }
 
   /**
@@ -112,31 +110,14 @@ class SimpleConsumer(val host: String,
    *  @return an array of offsets
    */
   def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = {
-    lock synchronized {
-      getOrMakeConnection()
-      var response: Tuple2[Receive,Int] = null
-      try {
-        sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets), channel)
-        response = getResponse(channel)
-      } catch {
-        case e : java.io.IOException =>
-          info("Reconnect in get offetset request due to socket error: ", e)
-          // retry once
-          try {
-            channel = connect
-            sendRequest(new OffsetRequest(topic, partition, time, maxNumOffsets), channel)
-            response = getResponse(channel)
-          } catch {
-            case ioe: java.io.IOException => channel = null; throw ioe;
-          }
-      }
-      OffsetRequest.deserializeOffsetArray(response._1.buffer)
-    }
+    val request = new OffsetRequest(topic, partition, time, maxNumOffsets)
+    val response = sendRequest(request)
+    OffsetRequest.deserializeOffsetArray(response._1.buffer)
   }
 
   private def getOrMakeConnection() {
-    if(channel == null) {
-      channel = connect()
+    if(!blockingChannel.isConnected) {
+      connect()
     }
   }
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala?rev=1305454&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala Mon Mar 26 17:12:30 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.net.InetSocketAddress
+import java.nio.channels._
+import kafka.utils.{nonthreadsafe, Logging}
+
+/**
+ *  A simple blocking channel with timeouts correctly enabled.
+ *
+ */
+@nonthreadsafe
+class BlockingChannel( val host: String, 
+                       val port: Int, 
+                       val readBufferSize: Int, 
+                       val writeBufferSize: Int, 
+                       val readTimeoutMs: Int ) extends Logging {
+
+  private var connected = false
+  private var channel: SocketChannel = null
+  private var readChannel: ReadableByteChannel = null
+  private var writeChannel: GatheringByteChannel = null
+  private val lock = new Object()
+  
+  def connect() = lock synchronized  {
+    if(!connected) {
+      channel = SocketChannel.open()
+      if(readBufferSize > 0)
+        channel.socket.setReceiveBufferSize(readBufferSize)
+      if(writeBufferSize > 0)
+        channel.socket.setSendBufferSize(writeBufferSize)
+      channel.configureBlocking(true)
+      channel.socket.setSoTimeout(readTimeoutMs)
+      channel.socket.setKeepAlive(true)
+      channel.connect(new InetSocketAddress(host, port))
+
+      writeChannel = channel
+      readChannel = Channels.newChannel(channel.socket().getInputStream)
+      connected = true
+    }
+  }
+  
+  def disconnect() = lock synchronized {
+    if(connected || channel != null) {
+      // closing the main socket channel *should* close the read channel
+      // but let's do it to be sure.
+      swallow(channel.close())
+      swallow(channel.socket.close())
+      swallow(readChannel.close())
+      channel = null; readChannel = null; writeChannel = null
+      connected = false
+    }
+  }
+
+  def isConnected = connected
+  
+  def send(request: Request):Int = {
+    if(!connected)
+      throw new ClosedChannelException()
+
+    val send = new BoundedByteBufferSend(request)
+    send.writeCompletely(writeChannel)
+  }
+  
+  def receive(): Tuple2[Receive, Int] = {
+    if(!connected)
+      throw new ClosedChannelException()
+
+    val response = new BoundedByteBufferReceive()
+    response.readCompletely(readChannel)
+
+    // this has the side effect of setting the initial position of buffer correctly
+    val errorCode: Int = response.buffer.getShort
+    (response, errorCode)
+  }
+
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/Transmission.scala Mon Mar 26 17:12:30 2012
@@ -50,12 +50,13 @@ trait Receive extends Transmission {
   def readFrom(channel: ReadableByteChannel): Int
   
   def readCompletely(channel: ReadableByteChannel): Int = {
-    var read = 0
+    var totalRead = 0
     while(!complete) {
-      read = readFrom(channel)
+      val read = readFrom(channel)
       trace(read + " bytes read.")
+      totalRead += read
     }
-    read
+    totalRead
   }
   
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Mon Mar 26 17:12:30 2012
@@ -17,14 +17,11 @@
 
 package kafka.producer
 
-import java.net.InetSocketAddress
-import java.nio.channels.SocketChannel
 import kafka.api._
 import kafka.common.MessageSizeTooLargeException
 import kafka.message.MessageSet
-import kafka.network.{BoundedByteBufferSend, Request, Receive}
+import kafka.network.{BlockingChannel, BoundedByteBufferSend, Request, Receive}
 import kafka.utils._
-import kafka.utils.Utils._
 
 /*
  * Send a message set.
@@ -33,11 +30,10 @@ import kafka.utils.Utils._
 class SyncProducer(val config: SyncProducerConfig) extends Logging {
   
   private val MaxConnectBackoffMs = 60000
-  private var channel : SocketChannel = null
   private var sentOnConnection = 0
   private val lock = new Object()
-  @volatile
-  private var shutdown: Boolean = false
+  @volatile private var shutdown: Boolean = false
+  private val blockingChannel = new BlockingChannel(config.host, config.port, 0, config.bufferSize, config.socketTimeoutMs)
 
   debug("Instantiating Scala Sync Producer")
 
@@ -64,21 +60,19 @@ class SyncProducer(val config: SyncProdu
 
       var response: Tuple2[Receive, Int] = null
       try {
-        sendRequest(request, channel)
-        response = getResponse(channel)
+        blockingChannel.send(request)
+        response = blockingChannel.receive()
       } catch {
         case e: java.io.IOException =>
           // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
           disconnect()
-          println("sdfsdfsdf")
           throw e
-        case e => println("other sdfsdfsdfs"); throw e
+        case e => throw e
       }
       // TODO: do we still need this?
       sentOnConnection += 1
       if(sentOnConnection >= config.reconnectInterval) {
-        disconnect()
-        channel = connect()
+        reconnect()
         sentOnConnection = 0
       }
       SyncProducerStats.recordProduceRequest(SystemTime.nanoseconds - startTime)
@@ -119,41 +113,38 @@ class SyncProducer(val config: SyncProdu
         throw new MessageSizeTooLargeException
   }
 
+  private def reconnect() {
+    disconnect()
+    connect()
+  }
+
   /**
    * Disconnect from current channel, closing connection.
    * Side effect: channel field is set to null on successful disconnect
    */
   private def disconnect() {
     try {
-      if(channel != null) {
+      if(blockingChannel.isConnected) {
         info("Disconnecting from " + config.host + ":" + config.port)
-        swallow(channel.close())
-        swallow(channel.socket.close())
-        channel = null
+        blockingChannel.disconnect()
       }
     } catch {
       case e: Exception => error("Error on disconnect: ", e)
     }
   }
     
-  private def connect(): SocketChannel = {
+  private def connect(): BlockingChannel = {
     var connectBackoffMs = 1
     val beginTimeMs = SystemTime.milliseconds
-    while(channel == null && !shutdown) {
+    while(!blockingChannel.isConnected && !shutdown) {
       try {
-        channel = SocketChannel.open()
-        channel.socket.setSendBufferSize(config.bufferSize)
-        channel.configureBlocking(true)
-        channel.socket.setSoTimeout(config.socketTimeoutMs)
-        channel.socket.setKeepAlive(true)
-        channel.connect(new InetSocketAddress(config.host, config.port))
+        blockingChannel.connect()
         info("Connected to " + config.host + ":" + config.port + " for producing")
-      }
-      catch {
+      } catch {
         case e: Exception => {
           disconnect()
           val endTimeMs = SystemTime.milliseconds
-          if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs) {
+          if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs ) {
             error("Producer connection to " +  config.host + ":" + config.port + " timing out after " + config.connectTimeoutMs + " ms", e)
             throw e
           }
@@ -163,12 +154,12 @@ class SyncProducer(val config: SyncProdu
         }
       }
     }
-    channel
+    blockingChannel
   }
 
   private def getOrMakeConnection() {
-    if(channel == null) {
-      channel = connect()
+    if(!blockingChannel.isConnected) {
+      connect()
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Mon Mar 26 17:12:30 2012
@@ -24,8 +24,8 @@ import kafka.producer._
 import kafka.serializer.Encoder
 import scala.collection.Map
 import scala.collection.mutable.{ListBuffer, HashMap}
-import kafka.common.{NoLeaderForPartitionException, InvalidPartitionException, NoBrokersForPartitionException}
 import kafka.utils.{Utils, Logging}
+import kafka.common.{FailedToSendMessageException, NoLeaderForPartitionException, InvalidPartitionException, NoBrokersForPartitionException}
 
 class DefaultEventHandler[K,V](config: ProducerConfig,                               // this api is for testing
                                private val partitioner: Partitioner[K],              // use the other constructor
@@ -42,17 +42,21 @@ class DefaultEventHandler[K,V](config: P
 
   def handle(events: Seq[ProducerData[K,V]]) {
     lock synchronized {
-     val serializedData = serialize(events)
+      val serializedData = serialize(events)
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.producerRetries
-      Stream.continually(dispatchSerializedData(outstandingProduceRequests))
-                        .takeWhile(requests => (remainingRetries > 0) && (requests.size > 0)).foreach {
-        currentOutstandingRequests =>
-          outstandingProduceRequests = currentOutstandingRequests
+      while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
+        outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
+        if (outstandingProduceRequests.size > 0)  {
           // back off and update the topic metadata cache before attempting another send operation
           Thread.sleep(config.producerRetryBackoffMs)
-          brokerPartitionInfo.updateInfo()
+          Utils.swallowError(brokerPartitionInfo.updateInfo())
           remainingRetries -= 1
+        }
+      }
+      if(outstandingProduceRequests.size > 0) {
+        error("Failed to send the following reqeusts: " + outstandingProduceRequests)
+        throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
       }
     }
   }
@@ -70,7 +74,7 @@ class DefaultEventHandler[K,V](config: P
         if((brokerid < 0) || (!send(brokerid, messageSetPerBroker)))
           failedProduceRequests.appendAll(eventsPerBrokerMap.map(r => r._2).flatten)
       }
-    }catch {
+    } catch {
       case t: Throwable => error("Failed to send messages")
     }
     failedProduceRequests

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Mon Mar 26 17:12:30 2012
@@ -701,20 +701,6 @@ object Utils extends Logging {
       case _ => // swallow
     }
   }
-
-  def sendRequest(request: Request, channel: SocketChannel) = {
-    val send = new BoundedByteBufferSend(request)
-    send.writeCompletely(channel)
-  }
-
-  def getResponse(channel: SocketChannel): Tuple2[Receive,Int] = {
-    val response = new BoundedByteBufferReceive()
-    response.readCompletely(channel)
-
-    // this has the side effect of setting the initial position of buffer correctly
-    val errorCode: Int = response.buffer.getShort
-    (response, errorCode)
-  }
 }
 
 class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Mon Mar 26 17:12:30 2012
@@ -18,18 +18,19 @@
 package kafka.producer
 
 import org.scalatest.junit.JUnit3Suite
-import kafka.zk.ZooKeeperTestHarness
-import kafka.consumer.SimpleConsumer
-import org.I0Itec.zkclient.ZkClient
-import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
 import java.util.Properties
-import org.apache.log4j.{Level, Logger}
-import org.junit.Test
-import kafka.utils.{TestZKUtils, Utils, TestUtils}
-import kafka.message.Message
 import kafka.admin.CreateTopicCommand
 import kafka.api.FetchRequestBuilder
+import kafka.common.FailedToSendMessageException
+import kafka.consumer.SimpleConsumer
+import kafka.message.Message
+import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.log4j.{Level, Logger}
+import org.I0Itec.zkclient.ZkClient
 import org.junit.Assert._
+import org.junit.Test
+import kafka.utils.{SystemTime, TestZKUtils, Utils, TestUtils}
 
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   private val brokerId1 = 0
@@ -77,7 +78,9 @@ class ProducerTest extends JUnit3Suite w
     // restore set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.ERROR)
     server1.shutdown
+    server1.awaitShutdown()
     server2.shutdown
+    server2.awaitShutdown()
     Utils.rm(server1.config.logDir)
     Utils.rm(server2.config.logDir)    
     Thread.sleep(500)
@@ -120,107 +123,63 @@ class ProducerTest extends JUnit3Suite w
       }
     } catch {
       case e: Exception => fail("Not expected", e)
+    } finally {
+      producer.close
     }
-    producer.close
   }
 
-//  @Test
-//  def testZKSendWithDeadBroker() {
-//    val props = new Properties()
-//    props.put("serializer.class", "kafka.serializer.StringEncoder")
-//    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-//    props.put("zk.connect", TestZKUtils.zookeeperConnect)
-//
-//    // create topic
-//    CreateTopicCommand.createTopic(zkClient, "new-topic", 2, 1, "0,0")
-//
-//    val config = new ProducerConfig(props)
-//
-//    val producer = new Producer[String, String](config)
-//    val message = new Message("test1".getBytes)
-//    try {
-////      // kill 2nd broker
-////      server1.shutdown
-////      Thread.sleep(100)
-//
-//      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
-//      // all partitions have broker 0 as the leader.
-//      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-//      Thread.sleep(100)
-//
-//      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-//      Thread.sleep(3000)
-//
-//      // restart server 1
-////      server1.startup()
-////      Thread.sleep(100)
-//
-//      // cross check if brokers got the messages
-//      val response = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-//      val messageSet = response.messageSet("new-topic", 0).iterator
-//      var numMessagesReceived = 0
-//      while(messageSet.hasNext) {
-//        val messageAndOffset = messageSet.next()
-//        assertEquals(message, messageSet.next.message)
-//        println("Received message at offset %d".format(messageAndOffset.offset))
-//        numMessagesReceived += 1
-//      }
-//      assertEquals("Message set should have 2 messages", 2, numMessagesReceived)
-//    } catch {
-//      case e: Exception => fail("Not expected", e)
-//    }
-//    producer.close
-//  }
-
-  // TODO: Need to rewrite when SyncProducer changes to throw timeout exceptions
-  //       and when leader logic is changed.
-//  @Test
-//  def testZKSendWithDeadBroker2() {
-//    val props = new Properties()
-//    props.put("serializer.class", "kafka.serializer.StringEncoder")
-//    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-//    props.put("socket.timeout.ms", "200")
-//    props.put("zk.connect", TestZKUtils.zookeeperConnect)
-//
-//    // create topic
-//    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
-//
-//    val config = new ProducerConfig(props)
-//
-//    val producer = new Producer[String, String](config)
-//    try {
-//      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
-//      // all partitions have broker 0 as the leader.
-//      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-//      Thread.sleep(100)
-//      // kill 2nd broker
-//      server1.shutdown
-//      Thread.sleep(500)
-//
-//      // Since all partitions are unavailable, this request will be dropped
-//      try {
-//        producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-//        fail("Leader broker for \"new-topic\" isn't up, should not be able to send data")
-//      } catch {
-//        case e: kafka.common.FailedToSendMessageException => // success
-//        case e => fail("Leader broker for \"new-topic\" isn't up, should not be able to send data")
-//      }
-//
-//      // restart server 1
-//      server1.startup()
-//      Thread.sleep(200)
-//
-//      // cross check if brokers got the messages
-//      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-//      val messageSet1 = response1.messageSet("new-topic", 0).iterator
-//      assertTrue("Message set should have 1 message", messageSet1.hasNext)
-//      assertEquals(new Message("test1".getBytes), messageSet1.next.message)
-//      assertFalse("Message set should not have more than 1 message", messageSet1.hasNext)
-//    } catch {
-//      case e: Exception => fail("Not expected", e)
-//    }
-//    producer.close
-//  }
+  @Test
+  def testZKSendWithDeadBroker() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
+    props.put("socket.timeout.ms", "2000")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    // create topic
+    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
+
+    val config = new ProducerConfig(props)
+    val producer = new Producer[String, String](config)
+    try {
+      // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
+      // on broker 0
+      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      Thread.sleep(100)
+    } catch {
+      case e => fail("Unexpected exception: " + e)
+    }
+
+    // kill the broker
+    server1.shutdown
+    server1.awaitShutdown()
+    Thread.sleep(100)
+
+    try {
+      // These sends should fail since there are no available brokers
+      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      Thread.sleep(100)
+      fail("Should fail since no leader exists for the partition.")
+    } catch {
+      case e => // success
+    }
+
+    // restart server 1
+    server1.startup()
+    Thread.sleep(500)
+
+    try {
+      // cross check if broker 1 got the messages
+      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      val messageSet1 = response1.messageSet("new-topic", 0).iterator
+      assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      assertFalse("Message set should have another message", messageSet1.hasNext)
+    } catch {
+      case e: Exception => fail("Not expected", e)
+    }
+    producer.close
+  }
 
   @Test
   def testZKSendToExistingTopicWithNoBrokers() {
@@ -250,6 +209,7 @@ class ProducerTest extends JUnit3Suite w
 
       // shutdown server2
       server2.shutdown
+      server2.awaitShutdown()
       Thread.sleep(100)
       // delete the new-topic logs
       Utils.rm(server2.config.logDir)
@@ -279,5 +239,56 @@ class ProducerTest extends JUnit3Suite w
       producer.close
     }
   }
+
+  @Test
+  def testAsyncSendCanCorrectlyFailWithTimeout() {
+    val timeoutMs = 500
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
+    props.put("socket.timeout.ms", String.valueOf(timeoutMs))
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    val config = new ProducerConfig(props)
+    val producer = new Producer[String, String](config)
+
+    // create topics in ZK
+    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
+
+    // do a simple test to make sure plumbing is okay
+    try {
+      // this message should be assigned to partition 0 whose leader is on broker 0
+      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
+      Thread.sleep(100)
+      // cross check if brokers got the messages
+      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      val messageSet1 = response1.messageSet("new-topic", 0).iterator
+      assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      assertEquals(new Message("test".getBytes), messageSet1.next.message)
+    } catch {
+      case e => case e: Exception => producer.close; fail("Not expected", e)
+    }
+
+    // stop IO threads and request handling, but leave networking operational
+    // any requests should be accepted and queue up, but not handled
+    server1.requestHandlerPool.shutdown()
+
+    val t1 = SystemTime.milliseconds
+    try {
+      // this message should be assigned to partition 0 whose leader is on broker 0, but
+      // broker 0 will not response within timeoutMs millis.
+      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
+    } catch {
+      case e: FailedToSendMessageException => /* success */
+      case e: Exception => fail("Not expected", e)
+    } finally {
+      producer.close
+    }
+    val t2 = SystemTime.milliseconds
+
+    // make sure we don't wait fewer than numRetries*timeoutMs milliseconds
+    // we do this because the DefaultEventHandler retries a number of times
+    assertTrue((t2-t1) >= timeoutMs*config.producerRetries)
+  }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1305454&r1=1305453&r2=1305454&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Mon Mar 26 17:12:30 2012
@@ -27,6 +27,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
+import java.net.SocketTimeoutException
 
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
@@ -90,7 +91,7 @@ class SyncProducerTest extends JUnit3Sui
   }
 
   @Test
-  def testProduceBlocksWhenRequired() {
+  def testProduceCorrectlyReceivesResponse() {
     // TODO: this will need to change with kafka-44
     val server = servers.head
     val props = new Properties()
@@ -134,4 +135,37 @@ class SyncProducerTest extends JUnit3Sui
     Assert.assertEquals(ErrorMapping.WrongPartitionCode.toShort, response2.errors(1))
     Assert.assertEquals(-1, response2.offsets(1))
   }
+
+  @Test
+  def testProducerCanTimeout() {
+    val timeoutMs = 500
+
+    val server = servers.head
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", server.socketServer.port.toString)
+    props.put("buffer.size", "102400")
+    props.put("socket.timeout.ms", String.valueOf(timeoutMs))
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+
+    val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
+    val request = TestUtils.produceRequest("topic1", 0, messages)
+
+    // stop IO threads and request handling, but leave networking operational
+    // any requests should be accepted and queue up, but not handled
+    server.requestHandlerPool.shutdown()
+    
+    val t1 = SystemTime.milliseconds
+    try {
+      val response2 = producer.send(request)
+      Assert.fail("Should have received timeout exception since request handling is stopped.")
+    } catch {
+      case e: SocketTimeoutException => /* success */
+      case e => Assert.fail("Unexpected exception when expecting timeout: " + e)
+    }
+    val t2 = SystemTime.milliseconds
+
+    // make sure we don't wait fewer than timeoutMs for a response
+    Assert.assertTrue((t2-t1) >= timeoutMs)
+  }
 }