You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/07/24 20:13:04 UTC
svn commit: r1365199 [3/3] - in /incubator/kafka/branches/0.8:
contrib/hadoop-consumer/src/main/java/kafka/etl/
contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/
core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/
core/src/main/sca...
Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1365199&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Tue Jul 24 18:13:01 2012
@@ -0,0 +1,288 @@
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Assert._
+import kafka.admin.CreateTopicCommand
+import kafka.utils.TestUtils._
+import kafka.utils.{Utils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.message.Message
+import java.io.RandomAccessFile
+import kafka.producer.{ProducerConfig, ProducerData, Producer}
+import org.junit.Test
+
+class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
+
+ val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+ override val replicaMaxLagTimeMs = 5000L
+ override val replicaMaxLagBytes = 10L
+ override val flushInterval = 10
+ override val replicaMinBytes = 20
+ })
+ val topic = "new-topic"
+ val partitionId = 0
+
+ val brokerId1 = 0
+ val brokerId2 = 1
+
+ val port1 = TestUtils.choosePort()
+ val port2 = TestUtils.choosePort()
+
+ var server1: KafkaServer = null
+ var server2: KafkaServer = null
+
+ val configProps1 = configs.head
+ val configProps2 = configs.last
+
+ val server1HWFile = configProps1.logDir + "/" + topic + "-0/highwatermark"
+ val server2HWFile = configProps2.logDir + "/" + topic + "-0/highwatermark"
+
+ val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
+ val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
+
+ var producer: Producer[Int, Message] = null
+ var hwFile1: RandomAccessFile = null
+ var hwFile2: RandomAccessFile = null
+ var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+
+ @Test
+ def testHWCheckpointNoFailuresSingleLogSegment {
+ // start both servers
+ server1 = TestUtils.createServer(configProps1)
+ server2 = TestUtils.createServer(configProps2)
+ servers ++= List(server1, server2)
+
+ val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+ producerProps.put("producer.request.timeout.ms", "1000")
+ producerProps.put("producer.request.required.acks", "-1")
+ producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+
+ // create topic with 1 partition, 2 replicas, one on each broker
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+
+ // wait until leader is elected
+ var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ assertTrue("Leader should get elected", leader.isDefined)
+ // NOTE: this is to avoid transient test failures
+ assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+
+ sendMessages()
+
+ hwFile1 = new RandomAccessFile(server1HWFile, "r")
+ hwFile2 = new RandomAccessFile(server2HWFile, "r")
+
+ sendMessages()
+ // don't wait for follower to read the leader's hw
+ // shutdown the servers to allow the hw to be checkpointed
+ servers.map(server => server.shutdown())
+ producer.close()
+ val leaderHW = readHW(hwFile1)
+ assertEquals(60L, leaderHW)
+ val followerHW = readHW(hwFile2)
+ assertEquals(30L, followerHW)
+ hwFile1.close()
+ hwFile2.close()
+ servers.map(server => Utils.rm(server.config.logDir))
+ }
+
+ def testHWCheckpointWithFailuresSingleLogSegment {
+ // start both servers
+ server1 = TestUtils.createServer(configProps1)
+ server2 = TestUtils.createServer(configProps2)
+ servers ++= List(server1, server2)
+
+ val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+ producerProps.put("producer.request.timeout.ms", "1000")
+ producerProps.put("producer.request.required.acks", "-1")
+ producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+
+ // create topic with 1 partition, 2 replicas, one on each broker
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+
+ // wait until leader is elected
+ var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ assertTrue("Leader should get elected", leader.isDefined)
+ // NOTE: this is to avoid transient test failures
+ assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+
+ hwFile1 = new RandomAccessFile(server1HWFile, "r")
+ hwFile2 = new RandomAccessFile(server2HWFile, "r")
+
+ assertEquals(0L, readHW(hwFile1))
+
+ sendMessages()
+
+ // kill the server hosting the preferred replica
+ server1.shutdown()
+ assertEquals(30L, readHW(hwFile1))
+
+ // check if leader moves to the other server
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
+
+ // bring the preferred replica back
+ server1.startup()
+
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
+
+ assertEquals(30L, readHW(hwFile1))
+ // since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet
+ server2.shutdown()
+ assertEquals(30L, readHW(hwFile2))
+
+ server2.startup()
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ assertEquals("Leader must remain on broker 0", 0, leader.getOrElse(-1))
+
+ sendMessages()
+ // give some time for follower 1 to record leader HW of 60
+ Thread.sleep(500)
+ // shutdown the servers to allow the hw to be checkpointed
+ servers.map(server => server.shutdown())
+ Thread.sleep(200)
+ producer.close()
+ assert(hwFile1.length() > 0)
+ assert(hwFile2.length() > 0)
+ assertEquals(60L, readHW(hwFile1))
+ assertEquals(60L, readHW(hwFile2))
+ hwFile1.close()
+ hwFile2.close()
+ servers.map(server => Utils.rm(server.config.logDir))
+ }
+
+ def testHWCheckpointNoFailuresMultipleLogSegments {
+ val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+ override val replicaMaxLagTimeMs = 5000L
+ override val replicaMaxLagBytes = 10L
+ override val flushInterval = 10
+ override val replicaMinBytes = 20
+ override val logFileSize = 30
+ })
+
+ val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark"
+ val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark"
+
+ // start both servers
+ server1 = TestUtils.createServer(configs.head)
+ server2 = TestUtils.createServer(configs.last)
+ servers ++= List(server1, server2)
+
+ val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+ producerProps.put("producer.request.timeout.ms", "1000")
+ producerProps.put("producer.request.required.acks", "-1")
+ producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+
+ // create topic with 1 partition, 2 replicas, one on each broker
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+
+ // wait until leader is elected
+ var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ assertTrue("Leader should get elected", leader.isDefined)
+ // NOTE: this is to avoid transient test failures
+ assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+
+ sendMessages(10)
+
+ hwFile1 = new RandomAccessFile(server1HWFile, "r")
+ hwFile2 = new RandomAccessFile(server2HWFile, "r")
+
+ sendMessages(10)
+
+ // give some time for follower 1 to record leader HW of 600
+ Thread.sleep(500)
+ // shutdown the servers to allow the hw to be checkpointed
+ servers.map(server => server.shutdown())
+ producer.close()
+ val leaderHW = readHW(hwFile1)
+ assertEquals(600L, leaderHW)
+ val followerHW = readHW(hwFile2)
+ assertEquals(600L, followerHW)
+ hwFile1.close()
+ hwFile2.close()
+ servers.map(server => Utils.rm(server.config.logDir))
+ }
+
+ def testHWCheckpointWithFailuresMultipleLogSegments {
+ val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+ override val replicaMaxLagTimeMs = 5000L
+ override val replicaMaxLagBytes = 10L
+ override val flushInterval = 1000
+ override val flushSchedulerThreadRate = 10
+ override val replicaMinBytes = 20
+ override val logFileSize = 30
+ })
+
+ val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark"
+ val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark"
+
+ // start both servers
+ server1 = TestUtils.createServer(configs.head)
+ server2 = TestUtils.createServer(configs.last)
+ servers ++= List(server1, server2)
+
+ val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+ producerProps.put("producer.request.timeout.ms", "1000")
+ producerProps.put("producer.request.required.acks", "-1")
+ producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+
+ // create topic with 1 partition, 2 replicas, one on each broker
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+
+ // wait until leader is elected
+ var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ assertTrue("Leader should get elected", leader.isDefined)
+ // NOTE: this is to avoid transient test failures
+ assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+
+ val hwFile1 = new RandomAccessFile(server1HWFile, "r")
+ val hwFile2 = new RandomAccessFile(server2HWFile, "r")
+
+ sendMessages(2)
+ // allow some time for the follower to get the leader HW
+ Thread.sleep(1000)
+ // kill the server hosting the preferred replica
+ server1.shutdown()
+ server2.shutdown()
+ assertEquals(60L, readHW(hwFile1))
+ assertEquals(60L, readHW(hwFile2))
+
+ server2.startup()
+ // check if leader moves to the other server
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
+
+ assertEquals(60L, readHW(hwFile1))
+
+ // bring the preferred replica back
+ server1.startup()
+
+ assertEquals(60L, readHW(hwFile1))
+ assertEquals(60L, readHW(hwFile2))
+
+ sendMessages(2)
+ // allow some time for the follower to get the leader HW
+ Thread.sleep(1000)
+ // shutdown the servers to allow the hw to be checkpointed
+ servers.map(server => server.shutdown())
+ producer.close()
+ assert(hwFile1.length() > 0)
+ assert(hwFile2.length() > 0)
+ assertEquals(120L, readHW(hwFile1))
+ assertEquals(120L, readHW(hwFile2))
+ hwFile1.close()
+ hwFile2.close()
+ servers.map(server => Utils.rm(server.config.logDir))
+ }
+
+ private def sendMessages(numMessages: Int = 1) {
+ for(i <- 0 until numMessages) {
+ producer.send(new ProducerData[Int, Message](topic, 0, sent1))
+ }
+ }
+
+ private def readHW(hwFile: RandomAccessFile): Long = {
+ hwFile.seek(0)
+ hwFile.readLong()
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Tue Jul 24 18:13:01 2012
@@ -18,7 +18,6 @@ package kafka.server
import java.io.File
import kafka.consumer.SimpleConsumer
-import java.util.Properties
import org.junit.Test
import junit.framework.Assert._
import kafka.message.{Message, ByteBufferMessageSet}
@@ -50,7 +49,7 @@ class ServerShutdownTest extends JUnit3S
// create topic
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
- val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
+ val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(zkConnect, 64*1024, 100000, 10000)))
// send some messages
producer.send(new ProducerData[Int, Message](topic, 0, sent1))
@@ -65,7 +64,7 @@ class ServerShutdownTest extends JUnit3S
{
- val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
+ val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(zkConnect, 64*1024, 100000, 10000)))
val consumer = new SimpleConsumer(host,
port,
1000000,
@@ -102,15 +101,4 @@ class ServerShutdownTest extends JUnit3S
}
}
-
- private def getProducerConfig(bufferSize: Int, connectTimeout: Int,
- reconnectInterval: Int): ProducerConfig = {
- val props = new Properties()
- props.put("zk.connect", zkConnect)
- props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
- props.put("buffer.size", bufferSize.toString)
- props.put("connect.timeout.ms", connectTimeout.toString)
- props.put("reconnect.interval", reconnectInterval.toString)
- new ProducerConfig(props)
- }
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Tue Jul 24 18:13:01 2012
@@ -293,6 +293,18 @@ object TestUtils extends Logging {
new Producer[K, V](new ProducerConfig(props))
}
+ def getProducerConfig(zkConnect: String, bufferSize: Int, connectTimeout: Int,
+ reconnectInterval: Int): Properties = {
+ val props = new Properties()
+ props.put("producer.type", "sync")
+ props.put("zk.connect", zkConnect)
+ props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
+ props.put("buffer.size", bufferSize.toString)
+ props.put("connect.timeout.ms", connectTimeout.toString)
+ props.put("reconnect.interval", reconnectInterval.toString)
+ props
+ }
+
def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
@@ -368,6 +380,11 @@ object TestUtils extends Logging {
pr
}
+ def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {
+ leaderPerPartitionMap.foreach(leaderForPartition => ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic,
+ leaderForPartition._1, leaderForPartition._2))
+ }
+
def waitUntilLeaderIsElected(zkClient: ZkClient, topic: String, partition: Int, timeoutMs: Long): Option[Int] = {
val leaderLock = new ReentrantLock()
val leaderExists = leaderLock.newCondition()
@@ -381,7 +398,8 @@ object TestUtils extends Logging {
case Some(l) => info("Leader %d exists for topic %s partition %d".format(l, topic, partition))
leader
case None => zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString),
- new LeaderExists(topic, partition, leaderExists))
+ new LeaderExistsListener(topic, partition, leaderLock, leaderExists))
+ info("No leader exists. Waiting for %d ms".format(timeoutMs))
leaderExists.await(timeoutMs, TimeUnit.MILLISECONDS)
// check if leader is elected
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala Tue Jul 24 18:13:01 2012
@@ -21,6 +21,7 @@ import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import org.junit.Assert._
+import kafka.common.KafkaException
class UtilsTest extends JUnitSuite {
@@ -29,7 +30,7 @@ class UtilsTest extends JUnitSuite {
@Test
def testSwallow() {
- Utils.swallow(logger.info, throw new IllegalStateException("test"))
+ Utils.swallow(logger.info, throw new KafkaException("test"))
}
@Test
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Tue Jul 24 18:13:01 2012
@@ -26,7 +26,7 @@ import kafka.utils.Utils
class EmbeddedZookeeper(val connectString: String) {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
- val tickTime = 2000
+ val tickTime = 500
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
val port = connectString.split(":")(1).toInt
val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala Tue Jul 24 18:13:01 2012
@@ -25,11 +25,12 @@ trait ZooKeeperTestHarness extends JUnit
val zkConnect: String = TestZKUtils.zookeeperConnect
var zookeeper: EmbeddedZookeeper = null
var zkClient: ZkClient = null
+ val zkConnectionTimeout = 6000
+ val zkSessionTimeout = 6000
override def setUp() {
zookeeper = new EmbeddedZookeeper(zkConnect)
- zkClient = new ZkClient(zookeeper.connectString)
- zkClient.setZkSerializer(ZKStringSerializer)
+ zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
super.setUp
}
Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Tue Jul 24 18:13:01 2012
@@ -75,6 +75,15 @@ object ProducerPerformance extends Loggi
.withRequiredArg
.describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
.ofType(classOf[String])
+ val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
+ .withRequiredArg()
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(3000)
+ val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
+ "to complete")
+ .withRequiredArg()
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(-1)
val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
.withRequiredArg
.describedAs("size")
@@ -128,10 +137,11 @@ object ProducerPerformance extends Loggi
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
val initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
val seqIdMode = options.has(initialMessageIdOpt)
-
+ val produceRequestTimeoutMs = options.valueOf(produceRequestTimeoutMsOpt).intValue()
+ val produceRequestRequiredAcks = options.valueOf(produceRequestRequiredAcksOpt).intValue()
+
// override necessary flags in seqIdMode
if (seqIdMode) {
- isAsync = true
batchSize = 1
isFixSize = true
@@ -175,6 +185,9 @@ object ProducerPerformance extends Loggi
props.put("batch.size", config.batchSize.toString)
props.put("queue.enqueueTimeout.ms", "-1")
}
+ props.put("producer.request.required.acks", config.produceRequestRequiredAcks.toString)
+ props.put("producer.request.timeout.ms", config.produceRequestTimeoutMs.toString)
+
val producerConfig = new ProducerConfig(props)
val producer = new Producer[Message, Message](producerConfig)
val seqIdNumDigit = 10 // no. of digits for max int value
@@ -198,12 +211,6 @@ object ProducerPerformance extends Loggi
val topicLabel = "Topic"
var leftPaddedSeqId : String = ""
- var messageSet: List[Message] = Nil
- if(config.isFixSize) {
- for(k <- 0 until config.batchSize) {
- messageSet ::= message
- }
- }
var j: Long = 0L
while(j < messagesPerThread) {
var strLength = config.messageSize
@@ -231,11 +238,17 @@ object ProducerPerformance extends Loggi
debug(seqMsgString)
message = new Message(seqMsgString.getBytes())
}
-
+
+ var messageSet: List[Message] = Nil
+ if(config.isFixSize) {
+ for(k <- 0 until config.batchSize) {
+ messageSet ::= message
+ }
+ }
+
if (!config.isFixSize) {
for(k <- 0 until config.batchSize) {
strLength = rand.nextInt(config.messageSize)
- val message = new Message(getByteArrayOfLength(strLength))
messageSet ::= message
bytesSent += message.payloadSize
}
@@ -262,7 +275,7 @@ object ProducerPerformance extends Loggi
nSends += 1
}
}catch {
- case e: Exception => e.printStackTrace
+ case e: Exception => error("Error sending messages", e)
}
if(nSends % config.reportingInterval == 0) {
reportTime = System.currentTimeMillis()
Modified: incubator/kafka/branches/0.8/system_test/common/util.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/common/util.sh?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/common/util.sh (original)
+++ incubator/kafka/branches/0.8/system_test/common/util.sh Tue Jul 24 18:13:01 2012
@@ -120,7 +120,7 @@ generate_kafka_properties_files() {
# ======================
keyword_to_replace="brokerid="
string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
- brokerid_idx=$(( $brokerid_to_start + $i - 1 ))
+ brokerid_idx=$(( $brokerid_to_start + $i))
string_to_replace="${keyword_to_replace}${brokerid_idx}"
# info "string to be replaced : [${string_to_be_replaced}]"
# info "string to replace : [${string_to_replace}]"
Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh Tue Jul 24 18:13:01 2012
@@ -42,7 +42,7 @@ readonly num_kafka_server=3
readonly replica_factor=3 # should be less than or equal to "num_kafka_server"
readonly my_brokerid_to_start=0 # this should be '0' for now
readonly my_server_port_to_start=9091 # if using this default, the ports to be used will be 9091, 9092, ...
-readonly producer_msg_batch_size=200 # batch no. of messsages by producer
+readonly producer_msg_batch_size=20 # batch no. of messsages by producer
readonly consumer_timeout_ms=10000 # elapsed time for consumer to timeout and exit
# ====================================
@@ -198,7 +198,7 @@ start_producer_perf() {
--topic ${this_topic} \
--messages $no_msg_to_produce \
--message-size 100 \
- --threads 5 \
+ --threads 1 \
--initial-message-id $init_msg_id \
2>&1 >> $producer_perf_log_pathname
}
@@ -213,6 +213,7 @@ start_console_consumer() {
--topic $this_consumer_topic \
--formatter 'kafka.consumer.ConsoleConsumer$DecodedMessageFormatter' \
--consumer-timeout-ms $consumer_timeout_ms \
+ --from-beginning \
2>&1 >> $console_consumer_log_pathname &
}
@@ -387,19 +388,11 @@ start_test() {
ldr_bkr_id=$?
info "leader broker id: $ldr_bkr_id"
- svr_idx=$(($ldr_bkr_id + 1))
+ svr_idx=$(($ldr_bkr_id))
- # ==========================================================
- # If KAFKA-350 is fixed, uncomment the following 3 lines to
- # STOP the server for failure test
- # ==========================================================
- #stop_server $svr_idx
- #info "sleeping for 10s"
- #sleep 10
-
- start_console_consumer $test_topic localhost:$zk_port
- info "sleeping for 5s"
- sleep 5
+ stop_server $svr_idx
+ info "sleeping for 10s"
+ sleep 10
init_id=$(( ($i - 1) * $producer_msg_batch_size ))
start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size $init_id
@@ -407,15 +400,15 @@ start_test() {
sleep 15
echo
- # ==========================================================
- # If KAFKA-350 is fixed, uncomment the following 3 lines to
- # START the server for failure test
- # ==========================================================
- #start_server $svr_idx
- #info "sleeping for 30s"
- #sleep 30
+ start_server $svr_idx
+ info "sleeping for 30s"
+ sleep 30
done
+ start_console_consumer $test_topic localhost:$zk_port
+ info "sleeping for 30s"
+ sleep 30
+
validate_results
echo
Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/log4j.properties Tue Jul 24 18:13:01 2012
@@ -31,6 +31,9 @@ log4j.appender.stdout.layout.ConversionP
# to print message checksum from ProducerPerformance
log4j.logger.kafka.perf=DEBUG
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+log4j.logger.kafka.producer.async=DEBUG
+log4j.logger.kafka.server.KafkaApis=TRACE
+log4j.logger.kafka.server.ReplicaManager=DEBUG
# to print message checksum from ProducerPerformance
log4j.logger.kafka.perf=DEBUG
Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/server.properties Tue Jul 24 18:13:01 2012
@@ -96,7 +96,8 @@ log.retention.hours=168
#log.retention.size=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.file.size=536870912
+#log.file.size=536870912
+log.file.size=100
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies