You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/07/24 20:13:04 UTC

svn commit: r1365199 [3/3] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/sca...

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1365199&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Tue Jul 24 18:13:01 2012
@@ -0,0 +1,288 @@
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Assert._
+import kafka.admin.CreateTopicCommand
+import kafka.utils.TestUtils._
+import kafka.utils.{Utils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.message.Message
+import java.io.RandomAccessFile
+import kafka.producer.{ProducerConfig, ProducerData, Producer}
+import org.junit.Test
+
+class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
+
+  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+    override val replicaMaxLagTimeMs = 5000L
+    override val replicaMaxLagBytes = 10L
+    override val flushInterval = 10
+    override val replicaMinBytes = 20
+  })
+  val topic = "new-topic"
+  val partitionId = 0
+
+  val brokerId1 = 0
+  val brokerId2 = 1
+
+  val port1 = TestUtils.choosePort()
+  val port2 = TestUtils.choosePort()
+
+  var server1: KafkaServer = null
+  var server2: KafkaServer = null
+
+  val configProps1 = configs.head
+  val configProps2 = configs.last
+
+  val server1HWFile = configProps1.logDir + "/" + topic + "-0/highwatermark"
+  val server2HWFile = configProps2.logDir + "/" + topic + "-0/highwatermark"
+
+  val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
+  val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
+
+  var producer: Producer[Int, Message] = null
+  var hwFile1: RandomAccessFile = null
+  var hwFile2: RandomAccessFile = null
+  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+
+  @Test
+  def testHWCheckpointNoFailuresSingleLogSegment {
+    // start both servers
+    server1 = TestUtils.createServer(configProps1)
+    server2 = TestUtils.createServer(configProps2)
+    servers ++= List(server1, server2)
+
+    val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+    producerProps.put("producer.request.timeout.ms", "1000")
+    producerProps.put("producer.request.required.acks", "-1")
+    producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+
+    // wait until leader is elected
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertTrue("Leader should get elected", leader.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+
+    sendMessages()
+
+    hwFile1 = new RandomAccessFile(server1HWFile, "r")
+    hwFile2 = new RandomAccessFile(server2HWFile, "r")
+
+    sendMessages()
+    // don't wait for follower to read the leader's hw
+    // shutdown the servers to allow the hw to be checkpointed
+    servers.map(server => server.shutdown())
+    producer.close()
+    val leaderHW = readHW(hwFile1)
+    assertEquals(60L, leaderHW)
+    val followerHW = readHW(hwFile2)
+    assertEquals(30L, followerHW)
+    hwFile1.close()
+    hwFile2.close()
+    servers.map(server => Utils.rm(server.config.logDir))
+  }
+
+  def testHWCheckpointWithFailuresSingleLogSegment {
+    // start both servers
+    server1 = TestUtils.createServer(configProps1)
+    server2 = TestUtils.createServer(configProps2)
+    servers ++= List(server1, server2)
+
+    val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+    producerProps.put("producer.request.timeout.ms", "1000")
+    producerProps.put("producer.request.required.acks", "-1")
+    producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+
+    // wait until leader is elected
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertTrue("Leader should get elected", leader.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+
+    hwFile1 = new RandomAccessFile(server1HWFile, "r")
+    hwFile2 = new RandomAccessFile(server2HWFile, "r")
+
+    assertEquals(0L, readHW(hwFile1))
+
+    sendMessages()
+
+    // kill the server hosting the preferred replica
+    server1.shutdown()
+    assertEquals(30L, readHW(hwFile1))
+
+    // check if leader moves to the other server
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
+
+    // bring the preferred replica back
+    server1.startup()
+
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
+
+    assertEquals(30L, readHW(hwFile1))
+    // since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet
+    server2.shutdown()
+    assertEquals(30L, readHW(hwFile2))
+
+    server2.startup()
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertEquals("Leader must remain on broker 0", 0, leader.getOrElse(-1))
+
+    sendMessages()
+    // give some time for follower 1 to record leader HW of 60
+    Thread.sleep(500)
+    // shutdown the servers to allow the hw to be checkpointed
+    servers.map(server => server.shutdown())
+    Thread.sleep(200)
+    producer.close()
+    assert(hwFile1.length() > 0)
+    assert(hwFile2.length() > 0)
+    assertEquals(60L, readHW(hwFile1))
+    assertEquals(60L, readHW(hwFile2))
+    hwFile1.close()
+    hwFile2.close()
+    servers.map(server => Utils.rm(server.config.logDir))
+  }
+
+  def testHWCheckpointNoFailuresMultipleLogSegments {
+    val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+      override val replicaMaxLagTimeMs = 5000L
+      override val replicaMaxLagBytes = 10L
+      override val flushInterval = 10
+      override val replicaMinBytes = 20
+      override val logFileSize = 30
+    })
+
+    val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark"
+    val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark"
+
+    // start both servers
+    server1 = TestUtils.createServer(configs.head)
+    server2 = TestUtils.createServer(configs.last)
+    servers ++= List(server1, server2)
+
+    val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+    producerProps.put("producer.request.timeout.ms", "1000")
+    producerProps.put("producer.request.required.acks", "-1")
+    producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+
+    // wait until leader is elected
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertTrue("Leader should get elected", leader.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+
+    sendMessages(10)
+
+    hwFile1 = new RandomAccessFile(server1HWFile, "r")
+    hwFile2 = new RandomAccessFile(server2HWFile, "r")
+
+    sendMessages(10)
+
+    // give some time for follower 1 to record leader HW of 600
+    Thread.sleep(500)
+    // shutdown the servers to allow the hw to be checkpointed
+    servers.map(server => server.shutdown())
+    producer.close()
+    val leaderHW = readHW(hwFile1)
+    assertEquals(600L, leaderHW)
+    val followerHW = readHW(hwFile2)
+    assertEquals(600L, followerHW)
+    hwFile1.close()
+    hwFile2.close()
+    servers.map(server => Utils.rm(server.config.logDir))
+  }
+
+  def testHWCheckpointWithFailuresMultipleLogSegments {
+    val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+      override val replicaMaxLagTimeMs = 5000L
+      override val replicaMaxLagBytes = 10L
+      override val flushInterval = 1000
+      override val flushSchedulerThreadRate = 10
+      override val replicaMinBytes = 20
+      override val logFileSize = 30
+    })
+
+    val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark"
+    val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark"
+
+    // start both servers
+    server1 = TestUtils.createServer(configs.head)
+    server2 = TestUtils.createServer(configs.last)
+    servers ++= List(server1, server2)
+
+    val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+    producerProps.put("producer.request.timeout.ms", "1000")
+    producerProps.put("producer.request.required.acks", "-1")
+    producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+
+    // wait until leader is elected
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertTrue("Leader should get elected", leader.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+
+    val hwFile1 = new RandomAccessFile(server1HWFile, "r")
+    val hwFile2 = new RandomAccessFile(server2HWFile, "r")
+
+    sendMessages(2)
+    // allow some time for the follower to get the leader HW
+    Thread.sleep(1000)
+    // kill the server hosting the preferred replica
+    server1.shutdown()
+    server2.shutdown()
+    assertEquals(60L, readHW(hwFile1))
+    assertEquals(60L, readHW(hwFile2))
+
+    server2.startup()
+    // check if leader moves to the other server
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
+
+    assertEquals(60L, readHW(hwFile1))
+
+    // bring the preferred replica back
+    server1.startup()
+
+    assertEquals(60L, readHW(hwFile1))
+    assertEquals(60L, readHW(hwFile2))
+
+    sendMessages(2)
+    // allow some time for the follower to get the leader HW
+    Thread.sleep(1000)
+    // shutdown the servers to allow the hw to be checkpointed
+    servers.map(server => server.shutdown())
+    producer.close()
+    assert(hwFile1.length() > 0)
+    assert(hwFile2.length() > 0)
+    assertEquals(120L, readHW(hwFile1))
+    assertEquals(120L, readHW(hwFile2))
+    hwFile1.close()
+    hwFile2.close()
+    servers.map(server => Utils.rm(server.config.logDir))
+  }
+
+  private def sendMessages(numMessages: Int = 1) {
+    for(i <- 0 until numMessages) {
+      producer.send(new ProducerData[Int, Message](topic, 0, sent1))
+    }
+  }
+
+  private def readHW(hwFile: RandomAccessFile): Long = {
+    hwFile.seek(0)
+    hwFile.readLong()
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Tue Jul 24 18:13:01 2012
@@ -18,7 +18,6 @@ package kafka.server
 
 import java.io.File
 import kafka.consumer.SimpleConsumer
-import java.util.Properties
 import org.junit.Test
 import junit.framework.Assert._
 import kafka.message.{Message, ByteBufferMessageSet}
@@ -50,7 +49,7 @@ class ServerShutdownTest extends JUnit3S
       // create topic
       CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
 
-      val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
+      val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(zkConnect, 64*1024, 100000, 10000)))
 
       // send some messages
       producer.send(new ProducerData[Int, Message](topic, 0, sent1))
@@ -65,7 +64,7 @@ class ServerShutdownTest extends JUnit3S
 
 
     {
-      val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
+      val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(zkConnect, 64*1024, 100000, 10000)))
       val consumer = new SimpleConsumer(host,
                                         port,
                                         1000000,
@@ -102,15 +101,4 @@ class ServerShutdownTest extends JUnit3S
     }
 
   }
-
-  private def getProducerConfig(bufferSize: Int, connectTimeout: Int,
-                                reconnectInterval: Int): ProducerConfig = {
-    val props = new Properties()
-    props.put("zk.connect", zkConnect)
-    props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
-    props.put("buffer.size", bufferSize.toString)
-    props.put("connect.timeout.ms", connectTimeout.toString)
-    props.put("reconnect.interval", reconnectInterval.toString)
-    new ProducerConfig(props)
-  }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Tue Jul 24 18:13:01 2012
@@ -293,6 +293,18 @@ object TestUtils extends Logging {
     new Producer[K, V](new ProducerConfig(props))
   }
 
+  def getProducerConfig(zkConnect: String, bufferSize: Int, connectTimeout: Int,
+                        reconnectInterval: Int): Properties = {
+    val props = new Properties()
+    props.put("producer.type", "sync")
+    props.put("zk.connect", zkConnect)
+    props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
+    props.put("buffer.size", bufferSize.toString)
+    props.put("connect.timeout.ms", connectTimeout.toString)
+    props.put("reconnect.interval", reconnectInterval.toString)
+    props
+  }
+
   def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
     val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
     ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
@@ -368,6 +380,11 @@ object TestUtils extends Logging {
     pr
   }
 
+  def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {
+    leaderPerPartitionMap.foreach(leaderForPartition => ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic,
+      leaderForPartition._1, leaderForPartition._2))
+  }
+
   def waitUntilLeaderIsElected(zkClient: ZkClient, topic: String, partition: Int, timeoutMs: Long): Option[Int] = {
     val leaderLock = new ReentrantLock()
     val leaderExists = leaderLock.newCondition()
@@ -381,7 +398,8 @@ object TestUtils extends Logging {
         case Some(l) => info("Leader %d exists for topic %s partition %d".format(l, topic, partition))
           leader
         case None => zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString),
-          new LeaderExists(topic, partition, leaderExists))
+          new LeaderExistsListener(topic, partition, leaderLock, leaderExists))
+        info("No leader exists. Waiting for %d ms".format(timeoutMs))
         leaderExists.await(timeoutMs, TimeUnit.MILLISECONDS)
           // check if leader is elected
         val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala Tue Jul 24 18:13:01 2012
@@ -21,6 +21,7 @@ import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
 import org.junit.Assert._
+import kafka.common.KafkaException
 
 
 class UtilsTest extends JUnitSuite {
@@ -29,7 +30,7 @@ class UtilsTest extends JUnitSuite {
 
   @Test
   def testSwallow() {
-    Utils.swallow(logger.info, throw new IllegalStateException("test"))
+    Utils.swallow(logger.info, throw new KafkaException("test"))
   }
 
   @Test

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Tue Jul 24 18:13:01 2012
@@ -26,7 +26,7 @@ import kafka.utils.Utils
 class EmbeddedZookeeper(val connectString: String) {
   val snapshotDir = TestUtils.tempDir()
   val logDir = TestUtils.tempDir()
-  val tickTime = 2000
+  val tickTime = 500
   val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
   val port = connectString.split(":")(1).toInt
   val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala Tue Jul 24 18:13:01 2012
@@ -25,11 +25,12 @@ trait ZooKeeperTestHarness extends JUnit
   val zkConnect: String = TestZKUtils.zookeeperConnect
   var zookeeper: EmbeddedZookeeper = null
   var zkClient: ZkClient = null
+  val zkConnectionTimeout = 6000
+  val zkSessionTimeout = 6000
 
   override def setUp() {
     zookeeper = new EmbeddedZookeeper(zkConnect)
-    zkClient = new ZkClient(zookeeper.connectString)
-    zkClient.setZkSerializer(ZKStringSerializer)
+    zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
     super.setUp
   }
 

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Tue Jul 24 18:13:01 2012
@@ -75,6 +75,15 @@ object ProducerPerformance extends Loggi
       .withRequiredArg
       .describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
       .ofType(classOf[String])
+    val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(3000)
+    val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
+      "to complete")
+      .withRequiredArg()
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(-1)
     val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
       .withRequiredArg
       .describedAs("size")
@@ -128,10 +137,11 @@ object ProducerPerformance extends Loggi
     val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
     val initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
     val seqIdMode = options.has(initialMessageIdOpt)
-    
+    val produceRequestTimeoutMs = options.valueOf(produceRequestTimeoutMsOpt).intValue()
+    val produceRequestRequiredAcks = options.valueOf(produceRequestRequiredAcksOpt).intValue()
+
     // override necessary flags in seqIdMode
     if (seqIdMode) { 
-      isAsync = true
       batchSize = 1
       isFixSize = true
 
@@ -175,6 +185,9 @@ object ProducerPerformance extends Loggi
       props.put("batch.size", config.batchSize.toString)
       props.put("queue.enqueueTimeout.ms", "-1")
     }
+    props.put("producer.request.required.acks", config.produceRequestRequiredAcks.toString)
+    props.put("producer.request.timeout.ms", config.produceRequestTimeoutMs.toString)
+
     val producerConfig = new ProducerConfig(props)
     val producer = new Producer[Message, Message](producerConfig)
     val seqIdNumDigit = 10   // no. of digits for max int value
@@ -198,12 +211,6 @@ object ProducerPerformance extends Loggi
       val topicLabel     = "Topic"
       var leftPaddedSeqId : String = ""
       
-      var messageSet: List[Message] = Nil
-      if(config.isFixSize) {
-        for(k <- 0 until config.batchSize) {
-          messageSet ::= message
-        }
-      }
       var j: Long = 0L
       while(j < messagesPerThread) {
         var strLength = config.messageSize
@@ -231,11 +238,17 @@ object ProducerPerformance extends Loggi
           debug(seqMsgString)
           message = new Message(seqMsgString.getBytes())
         }
-                
+
+        var messageSet: List[Message] = Nil
+        if(config.isFixSize) {
+          for(k <- 0 until config.batchSize) {
+            messageSet ::= message
+          }
+        }
+
         if (!config.isFixSize) {
           for(k <- 0 until config.batchSize) {
             strLength = rand.nextInt(config.messageSize)
-            val message = new Message(getByteArrayOfLength(strLength))
             messageSet ::= message
             bytesSent += message.payloadSize
           }
@@ -262,7 +275,7 @@ object ProducerPerformance extends Loggi
             nSends += 1
           }
         }catch {
-          case e: Exception => e.printStackTrace
+          case e: Exception => error("Error sending messages", e)
         }
         if(nSends % config.reportingInterval == 0) {
           reportTime = System.currentTimeMillis()

Modified: incubator/kafka/branches/0.8/system_test/common/util.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/common/util.sh?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/common/util.sh (original)
+++ incubator/kafka/branches/0.8/system_test/common/util.sh Tue Jul 24 18:13:01 2012
@@ -120,7 +120,7 @@ generate_kafka_properties_files() {
         # ======================
         keyword_to_replace="brokerid="
         string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
-        brokerid_idx=$(( $brokerid_to_start + $i - 1 ))
+        brokerid_idx=$(( $brokerid_to_start + $i))
         string_to_replace="${keyword_to_replace}${brokerid_idx}"
         # info "string to be replaced : [${string_to_be_replaced}]"
         # info "string to replace     : [${string_to_replace}]"

Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh Tue Jul 24 18:13:01 2012
@@ -42,7 +42,7 @@ readonly num_kafka_server=3             
 readonly replica_factor=3                             # should be less than or equal to "num_kafka_server"
 readonly my_brokerid_to_start=0                       # this should be '0' for now
 readonly my_server_port_to_start=9091                 # if using this default, the ports to be used will be 9091, 9092, ...
-readonly producer_msg_batch_size=200                  # batch no. of messsages by producer
+readonly producer_msg_batch_size=20                   # batch no. of messsages by producer
 readonly consumer_timeout_ms=10000                    # elapsed time for consumer to timeout and exit
 
 # ====================================
@@ -198,7 +198,7 @@ start_producer_perf() {
         --topic ${this_topic} \
         --messages $no_msg_to_produce \
         --message-size 100 \
-        --threads 5 \
+        --threads 1 \
         --initial-message-id $init_msg_id \
         2>&1 >> $producer_perf_log_pathname
 }
@@ -213,6 +213,7 @@ start_console_consumer() {
         --topic $this_consumer_topic \
         --formatter 'kafka.consumer.ConsoleConsumer$DecodedMessageFormatter' \
         --consumer-timeout-ms $consumer_timeout_ms \
+        --from-beginning \
         2>&1 >> $console_consumer_log_pathname &
 }
 
@@ -387,19 +388,11 @@ start_test() {
         ldr_bkr_id=$?
         info "leader broker id: $ldr_bkr_id"
 
-        svr_idx=$(($ldr_bkr_id + 1))
+        svr_idx=$(($ldr_bkr_id))
 
-        # ==========================================================
-        # If KAFKA-350 is fixed, uncomment the following 3 lines to
-        # STOP the server for failure test
-        # ==========================================================
-        #stop_server $svr_idx
-        #info "sleeping for 10s"
-        #sleep 10
-
-        start_console_consumer $test_topic localhost:$zk_port
-        info "sleeping for 5s"
-        sleep 5
+        stop_server $svr_idx
+        info "sleeping for 10s"
+        sleep 10
 
         init_id=$(( ($i - 1) * $producer_msg_batch_size ))
         start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size $init_id
@@ -407,15 +400,15 @@ start_test() {
         sleep 15
         echo
 
-        # ==========================================================
-        # If KAFKA-350 is fixed, uncomment the following 3 lines to
-        # START the server for failure test
-        # ==========================================================
-        #start_server $svr_idx
-        #info "sleeping for 30s"
-        #sleep 30
+        start_server $svr_idx
+        info "sleeping for 30s"
+        sleep 30
     done
 
+    start_console_consumer $test_topic localhost:$zk_port
+    info "sleeping for 30s"
+    sleep 30
+
     validate_results
     echo
 

Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties Tue Jul 24 18:13:01 2012
@@ -31,6 +31,9 @@ log4j.appender.stdout.layout.ConversionP
 # to print message checksum from ProducerPerformance
 log4j.logger.kafka.perf=DEBUG
 log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+log4j.logger.kafka.producer.async=DEBUG
+log4j.logger.kafka.server.KafkaApis=TRACE
+log4j.logger.kafka.server.ReplicaManager=DEBUG
 
 # to print message checksum from ProducerPerformance
 log4j.logger.kafka.perf=DEBUG

Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties Tue Jul 24 18:13:01 2012
@@ -96,7 +96,8 @@ log.retention.hours=168
 #log.retention.size=1073741824
 
 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.file.size=536870912
+#log.file.size=536870912
+log.file.size=100
 
 # The interval at which log segments are checked to see if they can be deleted according 
 # to the retention policies