You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/10/05 04:49:57 UTC

svn commit: r1179043 - in /incubator/kafka/trunk/core/src/test/scala/unit/kafka: javaapi/producer/SyncProducerTest.scala producer/SyncProducerTest.scala

Author: nehanarkhede
Date: Wed Oct  5 02:49:56 2011
New Revision: 1179043

URL: http://svn.apache.org/viewvc?rev=1179043&view=rev
Log:
KAFKA-146 testUnreachableServer sporadically fails; patched by nehanarkhede; reviewed by junrao

Modified:
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala?rev=1179043&r1=1179042&r2=1179043&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala Wed Oct  5 02:49:56 2011
@@ -49,46 +49,6 @@ class SyncProducerTest extends JUnitSuit
   }
 
   @Test
-  def testUnreachableServer() {
-    val props = new Properties()
-    props.put("host", "NOT_USED")
-    props.put("port", "9092")
-    props.put("buffer.size", "102400")
-    props.put("connect.timeout.ms", "300")
-    props.put("reconnect.interval", "1000")
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    var failed = false
-    val firstStart = SystemTime.milliseconds
-
-    //temporarily increase log4j level to avoid error in output
-    simpleProducerLogger.setLevel(Level.FATAL)
-    try {
-      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                        messages = getMessageList(new Message(messageBytes))))
-    }catch {
-      case e: Exception => failed = true
-    }
-    Assert.assertTrue(failed)
-    failed = false
-    val firstEnd = SystemTime.milliseconds
-    println("First message send retries took " + (firstEnd-firstStart) + " ms")
-    Assert.assertTrue((firstEnd-firstStart) < 300)
-
-    val secondStart = SystemTime.milliseconds
-    try {
-      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                        messages = getMessageList(new Message(messageBytes))))
-    }catch {
-      case e: Exception => failed = true
-
-    }
-    val secondEnd = SystemTime.milliseconds
-    println("Second message send retries took " + (secondEnd-secondStart) + " ms")
-    Assert.assertTrue((secondEnd-secondEnd) < 300)
-    simpleProducerLogger.setLevel(Level.ERROR)
-  }
-
-  @Test
   def testReachableServer() {
     val props = new Properties()
     props.put("host", "localhost")
@@ -130,42 +90,6 @@ class SyncProducerTest extends JUnitSuit
     Assert.assertFalse(failed)
   }
 
-  @Test
-  def testReachableServerWrongPort() {
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", "9091")
-    props.put("buffer.size", "102400")
-    props.put("connect.timeout.ms", "300")
-    props.put("reconnect.interval", "500")
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    var failed = false
-    val firstStart = SystemTime.milliseconds
-    //temporarily increase log4j level to avoid error in output
-    simpleProducerLogger.setLevel(Level.FATAL)
-    try {
-      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                        messages = getMessageList(new Message(messageBytes))))
-    }catch {
-      case e: Exception => failed = true
-    }
-    Assert.assertTrue(failed)
-    failed = false
-    val firstEnd = SystemTime.milliseconds
-    Assert.assertTrue((firstEnd-firstStart) < 300)
-    val secondStart = SystemTime.milliseconds
-    try {
-      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                        messages = getMessageList(new Message(messageBytes))))
-    }catch {
-      case e: Exception => failed = true
-    }
-    Assert.assertTrue(failed)
-    val secondEnd = SystemTime.milliseconds
-    Assert.assertTrue((secondEnd-secondEnd) < 300)
-    simpleProducerLogger.setLevel(Level.ERROR)
-  }
-
   private def getMessageList(message: Message): java.util.List[Message] = {
     val messageList = new java.util.ArrayList[Message]()
     messageList.add(message)

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1179043&r1=1179042&r2=1179043&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Wed Oct  5 02:49:56 2011
@@ -48,43 +48,6 @@ class SyncProducerTest extends JUnitSuit
   }
 
   @Test
-  def testUnreachableServer() {
-    val props = new Properties()
-    props.put("host", "NOT_USED")
-    props.put("port", server.socketServer.port.toString)
-    props.put("buffer.size", "102400")
-    props.put("connect.timeout.ms", "300")
-    props.put("reconnect.interval", "1000")
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    var failed = false
-    val firstStart = SystemTime.milliseconds
-
-    //temporarily increase log4j level to avoid error in output
-    simpleProducerLogger.setLevel(Level.FATAL)
-    try {
-      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
-    }catch {
-      case e: Exception => failed = true
-    }
-    Assert.assertTrue(failed)
-    failed = false
-    val firstEnd = SystemTime.milliseconds
-    println("First message send retries took " + (firstEnd-firstStart) + " ms")
-    Assert.assertTrue((firstEnd-firstStart) < 300)
-
-    val secondStart = SystemTime.milliseconds
-    try {
-      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
-    }catch {
-      case e: Exception => failed = true
-    }
-    val secondEnd = SystemTime.milliseconds
-    println("Second message send retries took " + (secondEnd-secondStart) + " ms")
-    Assert.assertTrue((secondEnd-secondStart) < 300)
-    simpleProducerLogger.setLevel(Level.ERROR)
-  }
-
-  @Test
   def testReachableServer() {
     val props = new Properties()
     props.put("host", "localhost")
@@ -123,40 +86,6 @@ class SyncProducerTest extends JUnitSuit
   }
 
   @Test
-  def testReachableServerWrongPort() {
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", (server.socketServer.port + 1).toString) // the wrong port
-    props.put("buffer.size", "102400")
-    props.put("connect.timeout.ms", "300")
-    props.put("reconnect.interval", "500")
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    var failed = false
-    val firstStart = SystemTime.milliseconds
-    //temporarily increase log4j level to avoid error in output
-    simpleProducerLogger.setLevel(Level.FATAL)
-    try {
-      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
-    }catch {
-      case e: Exception => failed = true
-    }
-    Assert.assertTrue(failed)
-    failed = false
-    val firstEnd = SystemTime.milliseconds
-    Assert.assertTrue((firstEnd-firstStart) < 300)
-    val secondStart = SystemTime.milliseconds
-    try {
-      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
-    }catch {
-      case e: Exception => failed = true
-    }
-    Assert.assertTrue(failed)
-    val secondEnd = SystemTime.milliseconds
-    Assert.assertTrue((secondEnd-secondStart) < 300)
-    simpleProducerLogger.setLevel(Level.ERROR)
-  }
-
-  @Test
   def testMessageSizeTooLarge() {
     val props = new Properties()
     props.put("host", "localhost")