You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/10/05 04:49:57 UTC
svn commit: r1179043 - in
/incubator/kafka/trunk/core/src/test/scala/unit/kafka:
javaapi/producer/SyncProducerTest.scala producer/SyncProducerTest.scala
Author: nehanarkhede
Date: Wed Oct 5 02:49:56 2011
New Revision: 1179043
URL: http://svn.apache.org/viewvc?rev=1179043&view=rev
Log:
KAFKA-146 testUnreachableServer sporadically fails; patched by nehanarkhede; reviewed by junrao
Modified:
incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala?rev=1179043&r1=1179042&r2=1179043&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala Wed Oct 5 02:49:56 2011
@@ -49,46 +49,6 @@ class SyncProducerTest extends JUnitSuit
}
@Test
- def testUnreachableServer() {
- val props = new Properties()
- props.put("host", "NOT_USED")
- props.put("port", "9092")
- props.put("buffer.size", "102400")
- props.put("connect.timeout.ms", "300")
- props.put("reconnect.interval", "1000")
- val producer = new SyncProducer(new SyncProducerConfig(props))
- var failed = false
- val firstStart = SystemTime.milliseconds
-
- //temporarily increase log4j level to avoid error in output
- simpleProducerLogger.setLevel(Level.FATAL)
- try {
- producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = getMessageList(new Message(messageBytes))))
- }catch {
- case e: Exception => failed = true
- }
- Assert.assertTrue(failed)
- failed = false
- val firstEnd = SystemTime.milliseconds
- println("First message send retries took " + (firstEnd-firstStart) + " ms")
- Assert.assertTrue((firstEnd-firstStart) < 300)
-
- val secondStart = SystemTime.milliseconds
- try {
- producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = getMessageList(new Message(messageBytes))))
- }catch {
- case e: Exception => failed = true
-
- }
- val secondEnd = SystemTime.milliseconds
- println("Second message send retries took " + (secondEnd-secondStart) + " ms")
- Assert.assertTrue((secondEnd-secondEnd) < 300)
- simpleProducerLogger.setLevel(Level.ERROR)
- }
-
- @Test
def testReachableServer() {
val props = new Properties()
props.put("host", "localhost")
@@ -130,42 +90,6 @@ class SyncProducerTest extends JUnitSuit
Assert.assertFalse(failed)
}
- @Test
- def testReachableServerWrongPort() {
- val props = new Properties()
- props.put("host", "localhost")
- props.put("port", "9091")
- props.put("buffer.size", "102400")
- props.put("connect.timeout.ms", "300")
- props.put("reconnect.interval", "500")
- val producer = new SyncProducer(new SyncProducerConfig(props))
- var failed = false
- val firstStart = SystemTime.milliseconds
- //temporarily increase log4j level to avoid error in output
- simpleProducerLogger.setLevel(Level.FATAL)
- try {
- producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = getMessageList(new Message(messageBytes))))
- }catch {
- case e: Exception => failed = true
- }
- Assert.assertTrue(failed)
- failed = false
- val firstEnd = SystemTime.milliseconds
- Assert.assertTrue((firstEnd-firstStart) < 300)
- val secondStart = SystemTime.milliseconds
- try {
- producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = getMessageList(new Message(messageBytes))))
- }catch {
- case e: Exception => failed = true
- }
- Assert.assertTrue(failed)
- val secondEnd = SystemTime.milliseconds
- Assert.assertTrue((secondEnd-secondEnd) < 300)
- simpleProducerLogger.setLevel(Level.ERROR)
- }
-
private def getMessageList(message: Message): java.util.List[Message] = {
val messageList = new java.util.ArrayList[Message]()
messageList.add(message)
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1179043&r1=1179042&r2=1179043&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Wed Oct 5 02:49:56 2011
@@ -48,43 +48,6 @@ class SyncProducerTest extends JUnitSuit
}
@Test
- def testUnreachableServer() {
- val props = new Properties()
- props.put("host", "NOT_USED")
- props.put("port", server.socketServer.port.toString)
- props.put("buffer.size", "102400")
- props.put("connect.timeout.ms", "300")
- props.put("reconnect.interval", "1000")
- val producer = new SyncProducer(new SyncProducerConfig(props))
- var failed = false
- val firstStart = SystemTime.milliseconds
-
- //temporarily increase log4j level to avoid error in output
- simpleProducerLogger.setLevel(Level.FATAL)
- try {
- producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
- }catch {
- case e: Exception => failed = true
- }
- Assert.assertTrue(failed)
- failed = false
- val firstEnd = SystemTime.milliseconds
- println("First message send retries took " + (firstEnd-firstStart) + " ms")
- Assert.assertTrue((firstEnd-firstStart) < 300)
-
- val secondStart = SystemTime.milliseconds
- try {
- producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
- }catch {
- case e: Exception => failed = true
- }
- val secondEnd = SystemTime.milliseconds
- println("Second message send retries took " + (secondEnd-secondStart) + " ms")
- Assert.assertTrue((secondEnd-secondStart) < 300)
- simpleProducerLogger.setLevel(Level.ERROR)
- }
-
- @Test
def testReachableServer() {
val props = new Properties()
props.put("host", "localhost")
@@ -123,40 +86,6 @@ class SyncProducerTest extends JUnitSuit
}
@Test
- def testReachableServerWrongPort() {
- val props = new Properties()
- props.put("host", "localhost")
- props.put("port", (server.socketServer.port + 1).toString) // the wrong port
- props.put("buffer.size", "102400")
- props.put("connect.timeout.ms", "300")
- props.put("reconnect.interval", "500")
- val producer = new SyncProducer(new SyncProducerConfig(props))
- var failed = false
- val firstStart = SystemTime.milliseconds
- //temporarily increase log4j level to avoid error in output
- simpleProducerLogger.setLevel(Level.FATAL)
- try {
- producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
- }catch {
- case e: Exception => failed = true
- }
- Assert.assertTrue(failed)
- failed = false
- val firstEnd = SystemTime.milliseconds
- Assert.assertTrue((firstEnd-firstStart) < 300)
- val secondStart = SystemTime.milliseconds
- try {
- producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
- }catch {
- case e: Exception => failed = true
- }
- Assert.assertTrue(failed)
- val secondEnd = SystemTime.milliseconds
- Assert.assertTrue((secondEnd-secondStart) < 300)
- simpleProducerLogger.setLevel(Level.ERROR)
- }
-
- @Test
def testMessageSizeTooLarge() {
val props = new Properties()
props.put("host", "localhost")