You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/21 19:17:30 UTC
svn commit: r1375670 [2/2] - in /incubator/kafka/branches/0.8:
contrib/hadoop-consumer/src/main/java/kafka/etl/
core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/
core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/
core/sr...
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Tue Aug 21 17:17:29 2012
@@ -193,7 +193,7 @@ class AdminTest extends JUnit3Suite with
val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
newTopicMetadata.errorCode match {
- case ErrorMapping.UnknownTopicCode =>
+ case ErrorMapping.UnknownTopicOrPartitionCode =>
fail("Topic " + topic + " should've been automatically created")
case _ =>
assertEquals(topic, newTopicMetadata.topic)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Tue Aug 21 17:17:29 2012
@@ -258,33 +258,13 @@ class ZookeeperConsumerConnectorTest ext
}
def testCompressionSetConsumption() {
- val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
- requestHandlerLogger.setLevel(Level.FATAL)
-
// send some messages to each broker
val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec)
val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
val sentMessages = (sentMessages1 ++ sentMessages2).sortWith((s,t) => s.checksum < t.checksum)
- // test consumer timeout logic
- val consumerConfig0 = new ConsumerConfig(
- TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
- override val consumerTimeoutMs = 5000
- }
- val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
- val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1))
- getMessages(100, topicMessageStreams0)
-
- // also check partition ownership
- val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
- val expected_1 = List( ("0", "group1_consumer0-0"),
- ("1", "group1_consumer0-0"))
- assertEquals(expected_1, actual_1)
-
- zkConsumerConnector0.shutdown
- // at this point, only some part of the message set was consumed. So consumed offset should still be 0
- // also fetched offset should be 0
- val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true)
+ val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
+ val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
val receivedMessages = getMessages(400, topicMessageStreams1)
val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
@@ -298,8 +278,6 @@ class ZookeeperConsumerConnectorTest ext
assertEquals(expected_2, actual_2)
zkConsumerConnector1.shutdown
-
- requestHandlerLogger.setLevel(Level.ERROR)
}
def testConsumerDecoder() {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Tue Aug 21 17:17:29 2012
@@ -25,13 +25,13 @@ import java.util.Properties
import kafka.producer.{ProducerData, Producer, ProducerConfig}
import kafka.serializer.StringDecoder
import kafka.message.Message
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.utils.TestUtils
import org.apache.log4j.{Level, Logger}
import org.I0Itec.zkclient.ZkClient
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection._
-import kafka.common.{ErrorMapping, InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
+import kafka.common.{ErrorMapping, UnknownTopicOrPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
import kafka.admin.{AdminUtils, CreateTopicCommand}
/**
@@ -207,7 +207,7 @@ class PrimitiveApiTest extends JUnit3Sui
}
fail("Expected exception when fetching message with invalid partition")
} catch {
- case e: InvalidPartitionException => "this is good"
+ case e: UnknownTopicOrPartitionException => "this is good"
}
}
@@ -273,7 +273,7 @@ class PrimitiveApiTest extends JUnit3Sui
response.messageSet(topic, -1).iterator
fail("Expected exception when fetching message with invalid partition")
} catch {
- case e: InvalidPartitionException => "this is good"
+ case e: UnknownTopicOrPartitionException => "this is good"
}
}
@@ -336,7 +336,7 @@ class PrimitiveApiTest extends JUnit3Sui
CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.getTopicMetaDataFromZK(List(newTopic),
- zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+ zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500)
val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Tue Aug 21 17:17:29 2012
@@ -21,14 +21,13 @@ import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.admin.CreateTopicCommand
import java.nio.ByteBuffer
-import kafka.log.LogManager
import junit.framework.Assert._
import org.easymock.EasyMock
import kafka.network._
import kafka.cluster.Broker
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
-import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
+import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig}
import kafka.common.ErrorMapping
import kafka.api.{TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
@@ -99,16 +98,10 @@ class TopicMetadataTest extends JUnit3Su
}
private def mockLogManagerAndTestTopic(topic: String): Seq[TopicMetadata] = {
- // topic metadata request only requires 2 APIs from the log manager
- val logManager = EasyMock.createMock(classOf[LogManager])
- val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
+ // topic metadata request only requires 1 call from the replica manager
val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
EasyMock.expect(replicaManager.config).andReturn(configs.head)
- EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
- EasyMock.expect(logManager.config).andReturn(configs.head)
EasyMock.replay(replicaManager)
- EasyMock.replay(logManager)
- EasyMock.replay(kafkaZookeeper)
// create a topic metadata request
val topicMetadataRequest = new TopicMetadataRequest(List(topic))
@@ -119,8 +112,7 @@ class TopicMetadataTest extends JUnit3Su
// create the kafka request handler
val requestChannel = new RequestChannel(2, 5)
- val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper, null,
- null, null, null, 1)
+ val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1)
// mock the receive API to return the request buffer as created above
val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
@@ -135,7 +127,6 @@ class TopicMetadataTest extends JUnit3Su
val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
// verify the expected calls to log manager occurred in the right order
- EasyMock.verify(kafkaZookeeper)
EasyMock.verify(receivedRequest)
topicMetadata
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Tue Aug 21 17:17:29 2012
@@ -30,7 +30,8 @@ import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import kafka.admin.CreateTopicCommand
import kafka.api.{FetchRequestBuilder, OffsetRequest}
-import kafka.common.UnknownTopicException
+import kafka.utils.TestUtils._
+import kafka.common.UnknownTopicOrPartitionException
object LogOffsetTest {
val random = new Random()
@@ -70,7 +71,7 @@ class LogOffsetTest extends JUnit3Suite
simpleConsumer.getOffsetsBefore("foo", 0, OffsetRequest.LatestTime, 10)
fail("Should fail with UnknownTopicException since topic foo was never created")
}catch {
- case e: UnknownTopicException => // this is ok
+ case e: UnknownTopicOrPartitionException => // this is ok
}
}
@@ -97,6 +98,7 @@ class LogOffsetTest extends JUnit3Suite
val offsets = log.getOffsetsBefore(offsetRequest)
assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
+ waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
OffsetRequest.LatestTime, 10)
assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
@@ -156,6 +158,7 @@ class LogOffsetTest extends JUnit3Suite
println("Offsets = " + offsets.mkString(","))
assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
+ waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10)
assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
}
@@ -182,6 +185,7 @@ class LogOffsetTest extends JUnit3Suite
assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+ waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
OffsetRequest.EarliestTime, 10)
assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Tue Aug 21 17:17:29 2012
@@ -261,10 +261,10 @@ class AsyncProducerTest extends JUnit3Su
topicPartitionInfos)
try {
handler.partitionAndCollate(producerDataList)
- fail("Should fail with InvalidPartitionException")
+ fail("Should fail with UnknownTopicOrPartitionException")
}
catch {
- case e: InvalidPartitionException => // expected, do nothing
+ case e: UnknownTopicOrPartitionException => // expected, do nothing
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Tue Aug 21 17:17:29 2012
@@ -89,7 +89,7 @@ class ProducerTest extends JUnit3Suite w
CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
- zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+ zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
val props1 = new util.Properties()
@@ -155,7 +155,7 @@ class ProducerTest extends JUnit3Suite w
CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
- zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+ zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
val producer1 = new Producer[String, String](producerConfig1)
@@ -210,7 +210,7 @@ class ProducerTest extends JUnit3Suite w
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
- zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+ zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 1, 500)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 2, 500)
@@ -271,7 +271,7 @@ class ProducerTest extends JUnit3Suite w
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
- zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+ zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
// do a simple test to make sure plumbing is okay
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Tue Aug 21 17:17:29 2012
@@ -154,7 +154,7 @@ class SyncProducerTest extends JUnit3Sui
Assert.assertEquals(request.correlationId, response.correlationId)
Assert.assertEquals(response.errors.length, response.offsets.length)
Assert.assertEquals(3, response.errors.length)
- response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, _))
+ response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, _))
response.offsets.foreach(Assert.assertEquals(-1L, _))
// #2 - test that we get correct offsets when partition is owned by broker
@@ -176,7 +176,7 @@ class SyncProducerTest extends JUnit3Sui
Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
// the middle message should have been rejected because broker doesn't lead partition
- Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, response2.errors(1))
+ Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, response2.errors(1))
Assert.assertEquals(-1, response2.offsets(1))
}
@@ -212,28 +212,4 @@ class SyncProducerTest extends JUnit3Sui
// make sure we don't wait fewer than timeoutMs for a response
Assert.assertTrue((t2-t1) >= timeoutMs)
}
-
- @Test
- def testProduceRequestForUnknownTopic() {
- val server = servers.head
- val props = new Properties()
- props.put("host", "localhost")
- props.put("port", server.socketServer.port.toString)
- props.put("buffer.size", "102400")
- props.put("connect.timeout.ms", "300")
- props.put("reconnect.interval", "500")
- props.put("max.message.size", "100")
-
- val producer = new SyncProducer(new SyncProducerConfig(props))
- val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
-
- val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1)
- val response = producer.send(request)
-
- Assert.assertNotNull(response)
- Assert.assertEquals(request.correlationId, response.correlationId)
- Assert.assertEquals(response.errors.length, response.offsets.length)
- Assert.assertEquals(3, response.errors.length)
- response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, _))
- }
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Tue Aug 21 17:17:29 2012
@@ -21,8 +21,9 @@ import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
import org.easymock.EasyMock
import org.junit.Assert._
-import kafka.utils.{KafkaScheduler, TestUtils, MockTime}
import kafka.common.KafkaException
+import kafka.cluster.Replica
+import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime}
class HighwatermarkPersistenceTest extends JUnit3Suite {
@@ -41,33 +42,31 @@ class HighwatermarkPersistenceTest exten
// create replica manager
val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
replicaManager.startup()
- replicaManager.startHighWaterMarksCheckPointThread()
- // sleep until flush ms
- Thread.sleep(configs.head.defaultFlushIntervalMs)
- var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
+ replicaManager.checkpointHighWatermarks()
+ var fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
assertEquals(0L, fooPartition0Hw)
- val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
+ val partition0 = replicaManager.getOrCreatePartition(topic, 0)
// create leader log
val log0 = getMockLog
// create leader and follower replicas
- val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet)
- val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0)
- replicaManager.checkpointHighwaterMarks()
- fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
- assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
+ val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
+ partition0.addReplicaIfNotExists(leaderReplicaPartition0)
+ val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime)
+ partition0.addReplicaIfNotExists(followerReplicaPartition0)
+ replicaManager.checkpointHighWatermarks()
+ fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
+ assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
try {
- followerReplicaPartition0.highWatermark()
- fail("Should fail with IllegalStateException")
+ followerReplicaPartition0.highWatermark
+ fail("Should fail with KafkaException")
}catch {
case e: KafkaException => // this is ok
}
- // set the leader
- partition0.leaderId(Some(leaderReplicaPartition0.brokerId))
// set the highwatermark for local replica
- partition0.leaderHW(Some(5L))
- replicaManager.checkpointHighwaterMarks()
- fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
- assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
+ partition0.getReplica().get.highWatermark = 5L
+ replicaManager.checkpointHighWatermarks()
+ fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
+ assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
EasyMock.verify(zkClient)
EasyMock.verify(log0)
}
@@ -84,48 +83,46 @@ class HighwatermarkPersistenceTest exten
// create replica manager
val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
replicaManager.startup()
- replicaManager.checkpointHighwaterMarks()
- var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+ replicaManager.checkpointHighWatermarks()
+ var topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
assertEquals(0L, topic1Partition0Hw)
- val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, configs.map(_.brokerId).toSet)
+ val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0)
// create leader log
val topic1Log0 = getMockLog
- // create leader and follower replicas
- val leaderReplicaTopic1Partition0 = replicaManager.addLocalReplica(topic1, 0, topic1Log0, configs.map(_.brokerId).toSet)
- replicaManager.checkpointHighwaterMarks()
- topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
- assertEquals(leaderReplicaTopic1Partition0.highWatermark(), topic1Partition0Hw)
- // set the leader
- topic1Partition0.leaderId(Some(leaderReplicaTopic1Partition0.brokerId))
+ // create a local replica for topic1
+ val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0))
+ topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
+ replicaManager.checkpointHighWatermarks()
+ topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
+ assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw)
// set the highwatermark for local replica
- topic1Partition0.leaderHW(Some(5L))
- replicaManager.checkpointHighwaterMarks()
- topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
- assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark())
+ topic1Partition0.getReplica().get.highWatermark = 5L
+ replicaManager.checkpointHighWatermarks()
+ topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
+ assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark)
assertEquals(5L, topic1Partition0Hw)
// add another partition and set highwatermark
- val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, configs.map(_.brokerId).toSet)
+ val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0)
// create leader log
val topic2Log0 = getMockLog
- // create leader and follower replicas
- val leaderReplicaTopic2Partition0 = replicaManager.addLocalReplica(topic2, 0, topic2Log0, configs.map(_.brokerId).toSet)
- replicaManager.checkpointHighwaterMarks()
- var topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
- assertEquals(leaderReplicaTopic2Partition0.highWatermark(), topic2Partition0Hw)
- // set the leader
- topic2Partition0.leaderId(Some(leaderReplicaTopic2Partition0.brokerId))
+ // create a local replica for topic2
+ val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0))
+ topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
+ replicaManager.checkpointHighWatermarks()
+ var topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0)
+ assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw)
// set the highwatermark for local replica
- topic2Partition0.leaderHW(Some(15L))
- assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark())
+ topic2Partition0.getReplica().get.highWatermark = 15L
+ assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark)
// change the highwatermark for topic1
- topic1Partition0.leaderHW(Some(10L))
- assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark())
- replicaManager.checkpointHighwaterMarks()
+ topic1Partition0.getReplica().get.highWatermark = 10L
+ assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark)
+ replicaManager.checkpointHighWatermarks()
// verify checkpointed hw for topic 2
- topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
+ topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0)
assertEquals(15L, topic2Partition0Hw)
// verify checkpointed hw for topic 1
- topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+ topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
assertEquals(10L, topic1Partition0Hw)
EasyMock.verify(zkClient)
EasyMock.verify(topic1Log0)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Tue Aug 21 17:17:29 2012
@@ -23,8 +23,7 @@ import kafka.cluster.{Partition, Replica
import org.easymock.EasyMock
import kafka.log.Log
import org.junit.Assert._
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{KafkaScheduler, Time, MockTime, TestUtils}
+import kafka.utils._
class ISRExpirationTest extends JUnit3Suite {
@@ -37,35 +36,24 @@ class ISRExpirationTest extends JUnit3Su
def testISRExpirationForStuckFollowers() {
val time = new MockTime
- // create leader replica
- val log = EasyMock.createMock(classOf[kafka.log.Log])
- EasyMock.expect(log.logEndOffset).andReturn(5L).times(12)
- EasyMock.replay(log)
+ val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
- // add one partition
- val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 5L)
+ // create one partition and all replicas
+ val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head, log)
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
val leaderReplica = partition0.getReplica(configs.head.brokerId).get
- // set remote replicas leo to something low, like 2
- (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 2))
-
- time.sleep(150)
- leaderReplica.logEndOffset(Some(5L))
- var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
- assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+ // let the follower catch up to 10
+ (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 10)
+ var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
+ assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
- // add all replicas back to the ISR
- partition0.inSyncReplicas ++= partition0.assignedReplicas()
- assertEquals("Replica 1 should be in sync", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
-
- leaderReplica.logEndOffset(Some(5L))
- // let the follower catch up only upto 3
- (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 3))
+ // let some time pass
time.sleep(150)
- // now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't
+
+ // now follower (broker id 1) has caught up to only 10, while the leader is at 15 AND the follower hasn't
// pulled any data for > replicaMaxLagTimeMs ms. So it is stuck
- partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
+ partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
EasyMock.verify(log)
}
@@ -73,104 +61,41 @@ class ISRExpirationTest extends JUnit3Su
def testISRExpirationForSlowFollowers() {
val time = new MockTime
// create leader replica
- val log = getLogWithHW(15L)
+ val log = getLogWithLogEndOffset(15L, 1)
// add one partition
- val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 15L)
+ val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head, log)
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
val leaderReplica = partition0.getReplica(configs.head.brokerId).get
// set remote replicas leo to something low, like 4
- (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 4))
+ (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 4L)
- time.sleep(150)
- leaderReplica.logEndOffset(Some(15L))
- time.sleep(10)
- (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffset(Some(4)))
-
- val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
+ // now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than
+ // replicaMaxLagBytes, the follower is out of sync.
+ val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
EasyMock.verify(log)
}
- def testISRExpirationForMultiplePartitions() {
- val time = new MockTime
- // mock zkclient
- val zkClient = EasyMock.createMock(classOf[ZkClient])
- EasyMock.replay(zkClient)
- // create kafka scheduler
- val scheduler = new KafkaScheduler(2)
- // create replica manager
- val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler, null)
- try {
- val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
- // create leader log
- val log0 = getLogWithHW(5L)
-
- // create leader and follower replicas
- val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet)
- val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0)
-
- partition0.inSyncReplicas = Set(followerReplicaPartition0, leaderReplicaPartition0)
- // set the leader and its hw and the hw update time
- partition0.leaderId(Some(configs.head.brokerId))
- partition0.leaderHW(Some(5L))
-
- // set the leo for non-leader replicas to something low
- (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLeo(r, 2))
-
- val log1 = getLogWithHW(15L)
- // create leader and follower replicas for partition 1
- val partition1 = replicaManager.getOrCreatePartition(topic, 1, configs.map(_.brokerId).toSet)
- val leaderReplicaPartition1 = replicaManager.addLocalReplica(topic, 1, log1, configs.map(_.brokerId).toSet)
- val followerReplicaPartition1 = replicaManager.addRemoteReplica(topic, 1, configs.last.brokerId, partition0)
-
- partition1.inSyncReplicas = Set(followerReplicaPartition1, leaderReplicaPartition1)
- // set the leader and its hw and the hw update time
- partition1.leaderId(Some(configs.head.brokerId))
- partition1.leaderHW(Some(15L))
-
- // set the leo for non-leader replicas to something low
- (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLeo(r, 4))
-
- time.sleep(150)
- leaderReplicaPartition0.logEndOffset(Some(4L))
- leaderReplicaPartition1.logEndOffset(Some(4L))
- time.sleep(10)
- (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffset(Some(4L)))
-
- val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
- assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
-
- val partition1OSR = partition1.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
- assertEquals("Replica 0 should be out of sync", Set(configs.last.brokerId), partition1OSR.map(_.brokerId))
-
- EasyMock.verify(log0)
- EasyMock.verify(log1)
- }catch {
- case e => e.printStackTrace()
- }finally {
- replicaManager.shutdown()
- }
- }
-
- private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
- localLog: Log, leaderHW: Long): Partition = {
- val partition = new Partition(topic, partitionId, time)
- val leaderReplica = new Replica(leaderId, partition, topic, time, Some(leaderHW), Some(localLog))
+ private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
+ localLog: Log): Partition = {
+ val leaderId=config.brokerId
+ val replicaManager = new ReplicaManager(config, time, null, null, null)
+ val partition = replicaManager.getOrCreatePartition(topic, partitionId)
+ val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
- partition.assignedReplicas(Some(allReplicas.toSet))
+ allReplicas.foreach(r => partition.addReplicaIfNotExists(r))
// set in sync replicas for this partition to all the assigned replicas
partition.inSyncReplicas = allReplicas.toSet
// set the leader and its hw and the hw update time
- partition.leaderId(Some(leaderId))
- partition.leaderHW(Some(leaderHW))
+ partition.leaderReplicaIdOpt = Some(leaderId)
partition
}
- private def getLogWithHW(hw: Long): Log = {
+ private def getLogWithLogEndOffset(logEndOffset: Long, expectedCalls: Int): Log = {
val log1 = EasyMock.createMock(classOf[kafka.log.Log])
- EasyMock.expect(log1.logEndOffset).andReturn(hw).times(6)
+ EasyMock.expect(log1.logEndOffset).andReturn(logEndOffset).times(expectedCalls)
EasyMock.replay(log1)
log1
@@ -178,7 +103,7 @@ class ISRExpirationTest extends JUnit3Su
private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {
configs.filter(_.brokerId != leaderId).map { config =>
- new Replica(config.brokerId, partition, topic, time)
+ new Replica(config.brokerId, partition, time)
}
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Tue Aug 21 17:17:29 2012
@@ -8,7 +8,6 @@ import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import kafka.message.Message
import kafka.producer.{ProducerConfig, ProducerData, Producer}
-import org.junit.Test
class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -41,7 +40,6 @@ class LogRecoveryTest extends JUnit3Suit
var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDir)
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
- @Test
def testHWCheckpointNoFailuresSingleLogSegment {
// start both servers
server1 = TestUtils.createServer(configProps1)
@@ -63,15 +61,18 @@ class LogRecoveryTest extends JUnit3Suit
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
sendMessages(2)
- // don't wait for follower to read the leader's hw
- // shutdown the servers to allow the hw to be checkpointed
- servers.map(server => server.shutdown())
+
+ // give some time for the follower 1 to record leader HW of 60
+ assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() =>
+ server2.replicaManager.getReplica(topic, 0).get.highWatermark == 60L, 1000))
+
+ servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
producer.close()
val leaderHW = hwFile1.read(topic, 0)
assertEquals(60L, leaderHW)
val followerHW = hwFile2.read(topic, 0)
- assertEquals(30L, followerHW)
- servers.map(server => Utils.rm(server.config.logDir))
+ assertEquals(60L, followerHW)
+ servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDir)})
}
def testHWCheckpointWithFailuresSingleLogSegment {
@@ -124,13 +125,13 @@ class LogRecoveryTest extends JUnit3Suit
sendMessages()
// give some time for follower 1 to record leader HW of 60
assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() =>
- server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000))
+ server2.replicaManager.getReplica(topic, 0).get.highWatermark == 60L, 1000))
// shutdown the servers to allow the hw to be checkpointed
- servers.map(server => server.shutdown())
+ servers.foreach(server => server.shutdown())
producer.close()
assertEquals(60L, hwFile1.read(topic, 0))
assertEquals(60L, hwFile2.read(topic, 0))
- servers.map(server => Utils.rm(server.config.logDir))
+ servers.foreach(server => Utils.rm(server.config.logDir))
}
def testHWCheckpointNoFailuresMultipleLogSegments {
@@ -166,15 +167,15 @@ class LogRecoveryTest extends JUnit3Suit
sendMessages(20)
// give some time for follower 1 to record leader HW of 600
assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() =>
- server2.getReplica(topic, 0).get.highWatermark() == 600L, 1000))
+ server2.replicaManager.getReplica(topic, 0).get.highWatermark == 600L, 1000))
// shutdown the servers to allow the hw to be checkpointed
- servers.map(server => server.shutdown())
+ servers.foreach(server => server.shutdown())
producer.close()
val leaderHW = hwFile1.read(topic, 0)
assertEquals(600L, leaderHW)
val followerHW = hwFile2.read(topic, 0)
assertEquals(600L, followerHW)
- servers.map(server => Utils.rm(server.config.logDir))
+ servers.foreach(server => Utils.rm(server.config.logDir))
}
def testHWCheckpointWithFailuresMultipleLogSegments {
@@ -211,7 +212,7 @@ class LogRecoveryTest extends JUnit3Suit
sendMessages(2)
// allow some time for the follower to get the leader HW
assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() =>
- server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000))
+ server2.replicaManager.getReplica(topic, 0).get.highWatermark == 60L, 1000))
// kill the server hosting the preferred replica
server1.shutdown()
server2.shutdown()
@@ -234,13 +235,13 @@ class LogRecoveryTest extends JUnit3Suit
sendMessages(2)
// allow some time for the follower to get the leader HW
assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() =>
- server1.getReplica(topic, 0).get.highWatermark() == 120L, 1000))
+ server1.replicaManager.getReplica(topic, 0).get.highWatermark == 120L, 1000))
// shutdown the servers to allow the hw to be checkpointed
- servers.map(server => server.shutdown())
+ servers.foreach(server => server.shutdown())
producer.close()
assertEquals(120L, hwFile1.read(topic, 0))
assertEquals(120L, hwFile2.read(topic, 0))
- servers.map(server => Utils.rm(server.config.logDir))
+ servers.foreach(server => Utils.rm(server.config.logDir))
}
private def sendMessages(numMessages: Int = 1) {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Tue Aug 21 17:17:29 2012
@@ -456,6 +456,19 @@ object TestUtils extends Logging {
// should never hit here
throw new RuntimeException("unexpected error")
}
+
+ def isLeaderLocalOnBroker(topic: String, partitionId: Int, server: KafkaServer): Boolean = {
+ val partitionOpt = server.replicaManager.getPartition(topic, partitionId)
+ partitionOpt match {
+ case None => false
+ case Some(partition) =>
+ val replicaOpt = partition.leaderReplicaIfLocal
+ replicaOpt match {
+ case None => false
+ case Some(_) => true
+ }
+ }
+ }
}
object ControllerTestUtils{
Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh Tue Aug 21 17:17:29 2012
@@ -150,7 +150,7 @@ cleanup() {
get_leader_brokerid() {
log_line=`grep -i -h 'completed the leader state transition' ${base_dir}/kafka_server_*.log | sort | tail -1`
info "found the log line: $log_line"
- broker_id=`echo $log_line | sed s'/^.*INFO Replica Manager on Broker //g' | awk -F ',' '{print $1}'`
+ broker_id=`echo $log_line | sed s'/^.*INFO Replica Manager on Broker //g' | awk -F ':' '{print $1}'`
return $broker_id
}