You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/21 19:17:30 UTC

svn commit: r1375670 [2/2] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/sr...

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Tue Aug 21 17:17:29 2012
@@ -193,7 +193,7 @@ class AdminTest extends JUnit3Suite with
 
     val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
     newTopicMetadata.errorCode match {
-      case ErrorMapping.UnknownTopicCode =>
+      case ErrorMapping.UnknownTopicOrPartitionCode =>
         fail("Topic " + topic + " should've been automatically created")
       case _ =>
         assertEquals(topic, newTopicMetadata.topic)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Tue Aug 21 17:17:29 2012
@@ -258,33 +258,13 @@ class ZookeeperConsumerConnectorTest ext
   }
 
   def testCompressionSetConsumption() {
-    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
-    requestHandlerLogger.setLevel(Level.FATAL)
-
     // send some messages to each broker
     val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec)
     val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
     val sentMessages = (sentMessages1 ++ sentMessages2).sortWith((s,t) => s.checksum < t.checksum)
 
-    // test consumer timeout logic
-    val consumerConfig0 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
-      override val consumerTimeoutMs = 5000
-    }
-    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
-    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1))
-    getMessages(100, topicMessageStreams0)
-
-    // also check partition ownership
-    val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_1 = List( ("0", "group1_consumer0-0"),
-                           ("1", "group1_consumer0-0"))
-    assertEquals(expected_1, actual_1)
-
-    zkConsumerConnector0.shutdown
-    // at this point, only some part of the message set was consumed. So consumed offset should still be 0
-    // also fetched offset should be 0
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
     val receivedMessages = getMessages(400, topicMessageStreams1)
     val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
@@ -298,8 +278,6 @@ class ZookeeperConsumerConnectorTest ext
     assertEquals(expected_2, actual_2)
 
     zkConsumerConnector1.shutdown
-
-    requestHandlerLogger.setLevel(Level.ERROR)
   }
 
   def testConsumerDecoder() {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Tue Aug 21 17:17:29 2012
@@ -25,13 +25,13 @@ import java.util.Properties
 import kafka.producer.{ProducerData, Producer, ProducerConfig}
 import kafka.serializer.StringDecoder
 import kafka.message.Message
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.utils.TestUtils
 import org.apache.log4j.{Level, Logger}
 import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
-import kafka.common.{ErrorMapping, InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
+import kafka.common.{ErrorMapping, UnknownTopicOrPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
 import kafka.admin.{AdminUtils, CreateTopicCommand}
 
 /**
@@ -207,7 +207,7 @@ class PrimitiveApiTest extends JUnit3Sui
         }
         fail("Expected exception when fetching message with invalid partition")
       } catch {
-        case e: InvalidPartitionException => "this is good"
+        case e: UnknownTopicOrPartitionException => "this is good"
       }
     }
 
@@ -273,7 +273,7 @@ class PrimitiveApiTest extends JUnit3Sui
           response.messageSet(topic, -1).iterator
         fail("Expected exception when fetching message with invalid partition")
       } catch {
-        case e: InvalidPartitionException => "this is good"
+        case e: UnknownTopicOrPartitionException => "this is good"
       }
     }
 
@@ -336,7 +336,7 @@ class PrimitiveApiTest extends JUnit3Sui
     CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.getTopicMetaDataFromZK(List(newTopic),
-        zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+        zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500)
     val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Tue Aug 21 17:17:29 2012
@@ -21,14 +21,13 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.CreateTopicCommand
 import java.nio.ByteBuffer
-import kafka.log.LogManager
 import junit.framework.Assert._
 import org.easymock.EasyMock
 import kafka.network._
 import kafka.cluster.Broker
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
-import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
+import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig}
 import kafka.common.ErrorMapping
 import kafka.api.{TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
 
@@ -99,16 +98,10 @@ class TopicMetadataTest extends JUnit3Su
   }
 
   private def mockLogManagerAndTestTopic(topic: String): Seq[TopicMetadata] = {
-    // topic metadata request only requires 2 APIs from the log manager
-    val logManager = EasyMock.createMock(classOf[LogManager])
-    val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
+    // topic metadata request only requires 1 call from the replica manager
     val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
     EasyMock.expect(replicaManager.config).andReturn(configs.head)
-    EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
-    EasyMock.expect(logManager.config).andReturn(configs.head)
     EasyMock.replay(replicaManager)
-    EasyMock.replay(logManager)
-    EasyMock.replay(kafkaZookeeper)
 
     // create a topic metadata request
     val topicMetadataRequest = new TopicMetadataRequest(List(topic))
@@ -119,8 +112,7 @@ class TopicMetadataTest extends JUnit3Su
 
     // create the kafka request handler
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper, null,
-                             null, null, null, 1)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1)
 
     // mock the receive API to return the request buffer as created above
     val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
@@ -135,7 +127,6 @@ class TopicMetadataTest extends JUnit3Su
     val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
 
     // verify the expected calls to log manager occurred in the right order
-    EasyMock.verify(kafkaZookeeper)
     EasyMock.verify(receivedRequest)
 
     topicMetadata

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Tue Aug 21 17:17:29 2012
@@ -30,7 +30,8 @@ import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import kafka.admin.CreateTopicCommand
 import kafka.api.{FetchRequestBuilder, OffsetRequest}
-import kafka.common.UnknownTopicException
+import kafka.utils.TestUtils._
+import kafka.common.UnknownTopicOrPartitionException
 
 object LogOffsetTest {
   val random = new Random()  
@@ -70,7 +71,7 @@ class LogOffsetTest extends JUnit3Suite 
       simpleConsumer.getOffsetsBefore("foo", 0, OffsetRequest.LatestTime, 10)
       fail("Should fail with UnknownTopicException since topic foo was never created")
     }catch {
-      case e: UnknownTopicException => // this is ok
+      case e: UnknownTopicOrPartitionException => // this is ok
     }
   }
 
@@ -97,6 +98,7 @@ class LogOffsetTest extends JUnit3Suite 
     val offsets = log.getOffsetsBefore(offsetRequest)
     assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
 
+    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
     val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
                                                           OffsetRequest.LatestTime, 10)
     assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
@@ -156,6 +158,7 @@ class LogOffsetTest extends JUnit3Suite 
     println("Offsets = " + offsets.mkString(","))
     assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
 
+    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
     val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10)
     assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
   }
@@ -182,6 +185,7 @@ class LogOffsetTest extends JUnit3Suite 
 
     assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
 
+    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
     val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
                                                           OffsetRequest.EarliestTime, 10)
     assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Tue Aug 21 17:17:29 2012
@@ -261,10 +261,10 @@ class AsyncProducerTest extends JUnit3Su
                                                          topicPartitionInfos)
     try {
       handler.partitionAndCollate(producerDataList)
-      fail("Should fail with InvalidPartitionException")
+      fail("Should fail with UnknownTopicOrPartitionException")
     }
     catch {
-      case e: InvalidPartitionException => // expected, do nothing
+      case e: UnknownTopicOrPartitionException => // expected, do nothing
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Tue Aug 21 17:17:29 2012
@@ -89,7 +89,7 @@ class ProducerTest extends JUnit3Suite w
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
-        zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+        zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     val props1 = new util.Properties()
@@ -155,7 +155,7 @@ class ProducerTest extends JUnit3Suite w
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
-        zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+        zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     val producer1 = new Producer[String, String](producerConfig1)
@@ -210,7 +210,7 @@ class ProducerTest extends JUnit3Suite w
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
-        zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+        zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 1, 500)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 2, 500)
@@ -271,7 +271,7 @@ class ProducerTest extends JUnit3Suite w
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
-        zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+        zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     // do a simple test to make sure plumbing is okay

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Tue Aug 21 17:17:29 2012
@@ -154,7 +154,7 @@ class SyncProducerTest extends JUnit3Sui
     Assert.assertEquals(request.correlationId, response.correlationId)
     Assert.assertEquals(response.errors.length, response.offsets.length)
     Assert.assertEquals(3, response.errors.length)
-    response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, _))
+    response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, _))
     response.offsets.foreach(Assert.assertEquals(-1L, _))
 
     // #2 - test that we get correct offsets when partition is owned by broker
@@ -176,7 +176,7 @@ class SyncProducerTest extends JUnit3Sui
     Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
 
     // the middle message should have been rejected because broker doesn't lead partition
-    Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, response2.errors(1))
+    Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, response2.errors(1))
     Assert.assertEquals(-1, response2.offsets(1))
   }
 
@@ -212,28 +212,4 @@ class SyncProducerTest extends JUnit3Sui
     // make sure we don't wait fewer than timeoutMs for a response
     Assert.assertTrue((t2-t1) >= timeoutMs)
   }
-
-  @Test
-  def testProduceRequestForUnknownTopic() {
-    val server = servers.head
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", server.socketServer.port.toString)
-    props.put("buffer.size", "102400")
-    props.put("connect.timeout.ms", "300")
-    props.put("reconnect.interval", "500")
-    props.put("max.message.size", "100")
-
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
-
-    val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1)
-    val response = producer.send(request)
-
-    Assert.assertNotNull(response)
-    Assert.assertEquals(request.correlationId, response.correlationId)
-    Assert.assertEquals(response.errors.length, response.offsets.length)
-    Assert.assertEquals(3, response.errors.length)
-    response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, _))
-  }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Tue Aug 21 17:17:29 2012
@@ -21,8 +21,9 @@ import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
 import org.easymock.EasyMock
 import org.junit.Assert._
-import kafka.utils.{KafkaScheduler, TestUtils, MockTime}
 import kafka.common.KafkaException
+import kafka.cluster.Replica
+import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime}
 
 class HighwatermarkPersistenceTest extends JUnit3Suite {
 
@@ -41,33 +42,31 @@ class HighwatermarkPersistenceTest exten
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
     replicaManager.startup()
-    replicaManager.startHighWaterMarksCheckPointThread()
-    // sleep until flush ms
-    Thread.sleep(configs.head.defaultFlushIntervalMs)
-    var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
+    replicaManager.checkpointHighWatermarks()
+    var fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
     assertEquals(0L, fooPartition0Hw)
-    val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
+    val partition0 = replicaManager.getOrCreatePartition(topic, 0)
     // create leader log
     val log0 = getMockLog
     // create leader and follower replicas
-    val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet)
-    val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0)
-    replicaManager.checkpointHighwaterMarks()
-    fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
-    assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
+    val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
+    partition0.addReplicaIfNotExists(leaderReplicaPartition0)
+    val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime)
+    partition0.addReplicaIfNotExists(followerReplicaPartition0)
+    replicaManager.checkpointHighWatermarks()
+    fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
+    assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
     try {
-      followerReplicaPartition0.highWatermark()
-      fail("Should fail with IllegalStateException")
+      followerReplicaPartition0.highWatermark
+      fail("Should fail with KafkaException")
     }catch {
       case e: KafkaException => // this is ok
     }
-    // set the leader
-    partition0.leaderId(Some(leaderReplicaPartition0.brokerId))
     // set the highwatermark for local replica
-    partition0.leaderHW(Some(5L))
-    replicaManager.checkpointHighwaterMarks()
-    fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
-    assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
+    partition0.getReplica().get.highWatermark = 5L
+    replicaManager.checkpointHighWatermarks()
+    fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
+    assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
     EasyMock.verify(zkClient)
     EasyMock.verify(log0)
   }
@@ -84,48 +83,46 @@ class HighwatermarkPersistenceTest exten
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
     replicaManager.startup()
-    replicaManager.checkpointHighwaterMarks()
-    var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+    replicaManager.checkpointHighWatermarks()
+    var topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
     assertEquals(0L, topic1Partition0Hw)
-    val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, configs.map(_.brokerId).toSet)
+    val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0)
     // create leader log
     val topic1Log0 = getMockLog
-    // create leader and follower replicas
-    val leaderReplicaTopic1Partition0 = replicaManager.addLocalReplica(topic1, 0, topic1Log0, configs.map(_.brokerId).toSet)
-    replicaManager.checkpointHighwaterMarks()
-    topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
-    assertEquals(leaderReplicaTopic1Partition0.highWatermark(), topic1Partition0Hw)
-    // set the leader
-    topic1Partition0.leaderId(Some(leaderReplicaTopic1Partition0.brokerId))
+    // create a local replica for topic1
+    val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0))
+    topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
+    replicaManager.checkpointHighWatermarks()
+    topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
+    assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw)
     // set the highwatermark for local replica
-    topic1Partition0.leaderHW(Some(5L))
-    replicaManager.checkpointHighwaterMarks()
-    topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
-    assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark())
+    topic1Partition0.getReplica().get.highWatermark = 5L
+    replicaManager.checkpointHighWatermarks()
+    topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
+    assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark)
     assertEquals(5L, topic1Partition0Hw)
     // add another partition and set highwatermark
-    val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, configs.map(_.brokerId).toSet)
+    val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0)
     // create leader log
     val topic2Log0 = getMockLog
-    // create leader and follower replicas
-    val leaderReplicaTopic2Partition0 = replicaManager.addLocalReplica(topic2, 0, topic2Log0, configs.map(_.brokerId).toSet)
-    replicaManager.checkpointHighwaterMarks()
-    var topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
-    assertEquals(leaderReplicaTopic2Partition0.highWatermark(), topic2Partition0Hw)
-    // set the leader
-    topic2Partition0.leaderId(Some(leaderReplicaTopic2Partition0.brokerId))
+    // create a local replica for topic2
+    val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0))
+    topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
+    replicaManager.checkpointHighWatermarks()
+    var topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0)
+    assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw)
     // set the highwatermark for local replica
-    topic2Partition0.leaderHW(Some(15L))
-    assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark())
+    topic2Partition0.getReplica().get.highWatermark = 15L
+    assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark)
     // change the highwatermark for topic1
-    topic1Partition0.leaderHW(Some(10L))
-    assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark())
-    replicaManager.checkpointHighwaterMarks()
+    topic1Partition0.getReplica().get.highWatermark = 10L
+    assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark)
+    replicaManager.checkpointHighWatermarks()
     // verify checkpointed hw for topic 2
-    topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
+    topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0)
     assertEquals(15L, topic2Partition0Hw)
     // verify checkpointed hw for topic 1
-    topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+    topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
     assertEquals(10L, topic1Partition0Hw)
     EasyMock.verify(zkClient)
     EasyMock.verify(topic1Log0)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Tue Aug 21 17:17:29 2012
@@ -23,8 +23,7 @@ import kafka.cluster.{Partition, Replica
 import org.easymock.EasyMock
 import kafka.log.Log
 import org.junit.Assert._
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{KafkaScheduler, Time, MockTime, TestUtils}
+import kafka.utils._
 
 class ISRExpirationTest extends JUnit3Suite {
 
@@ -37,35 +36,24 @@ class ISRExpirationTest extends JUnit3Su
 
   def testISRExpirationForStuckFollowers() {
     val time = new MockTime
-    // create leader replica
-    val log = EasyMock.createMock(classOf[kafka.log.Log])
-    EasyMock.expect(log.logEndOffset).andReturn(5L).times(12)
-    EasyMock.replay(log)
+    val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
 
-    // add one partition
-    val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 5L)
+    // create one partition and all replicas
+    val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head, log)
     assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
-    // set remote replicas leo to something low, like 2
-    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 2))
-
-    time.sleep(150)
-    leaderReplica.logEndOffset(Some(5L))
 
-    var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
-    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+    // let the follower catch up to 10
+    (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 10)
+    var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
 
-    // add all replicas back to the ISR
-    partition0.inSyncReplicas ++= partition0.assignedReplicas()
-    assertEquals("Replica 1 should be in sync", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
-
-    leaderReplica.logEndOffset(Some(5L))
-    // let the follower catch up only upto 3
-    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 3))
+    // let some time pass
     time.sleep(150)
-    // now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't
+
+    // now follower (broker id 1) has caught up to only 10, while the leader is at 15 AND the follower hasn't
     // pulled any data for > replicaMaxLagTimeMs ms. So it is stuck
-    partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
+    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
     assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
     EasyMock.verify(log)
   }
@@ -73,104 +61,41 @@ class ISRExpirationTest extends JUnit3Su
   def testISRExpirationForSlowFollowers() {
     val time = new MockTime
     // create leader replica
-    val log = getLogWithHW(15L)
+    val log = getLogWithLogEndOffset(15L, 1)
     // add one partition
-    val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 15L)
+    val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head, log)
     assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
     // set remote replicas leo to something low, like 4
-    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 4))
+    (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 4L)
 
-    time.sleep(150)
-    leaderReplica.logEndOffset(Some(15L))
-    time.sleep(10)
-    (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffset(Some(4)))
-
-    val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
+    // now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than
+    // replicaMaxLagBytes, the follower is out of sync.
+    val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
     assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
 
     EasyMock.verify(log)
   }
 
-  def testISRExpirationForMultiplePartitions() {
-    val time = new MockTime
-    // mock zkclient
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    EasyMock.replay(zkClient)
-    // create kafka scheduler
-    val scheduler = new KafkaScheduler(2)
-    // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler, null)
-    try {
-      val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
-      // create leader log
-      val log0 = getLogWithHW(5L)
-
-      // create leader and follower replicas
-      val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet)
-      val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0)
-
-      partition0.inSyncReplicas = Set(followerReplicaPartition0, leaderReplicaPartition0)
-      // set the leader and its hw and the hw update time
-      partition0.leaderId(Some(configs.head.brokerId))
-      partition0.leaderHW(Some(5L))
-
-      // set the leo for non-leader replicas to something low
-      (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLeo(r, 2))
-
-      val log1 = getLogWithHW(15L)
-      // create leader and follower replicas for partition 1
-      val partition1 = replicaManager.getOrCreatePartition(topic, 1, configs.map(_.brokerId).toSet)
-      val leaderReplicaPartition1 = replicaManager.addLocalReplica(topic, 1, log1, configs.map(_.brokerId).toSet)
-      val followerReplicaPartition1 = replicaManager.addRemoteReplica(topic, 1, configs.last.brokerId, partition0)
-
-      partition1.inSyncReplicas = Set(followerReplicaPartition1, leaderReplicaPartition1)
-      // set the leader and its hw and the hw update time
-      partition1.leaderId(Some(configs.head.brokerId))
-      partition1.leaderHW(Some(15L))
-
-      // set the leo for non-leader replicas to something low
-      (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLeo(r, 4))
-
-      time.sleep(150)
-      leaderReplicaPartition0.logEndOffset(Some(4L))
-      leaderReplicaPartition1.logEndOffset(Some(4L))
-      time.sleep(10)
-      (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffset(Some(4L)))
-
-      val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
-      assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
-
-      val partition1OSR = partition1.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
-      assertEquals("Replica 0 should be out of sync", Set(configs.last.brokerId), partition1OSR.map(_.brokerId))
-
-      EasyMock.verify(log0)
-      EasyMock.verify(log1)
-    }catch {
-      case e => e.printStackTrace()
-    }finally {
-      replicaManager.shutdown()
-    }
-  }
-
-  private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
-                                               localLog: Log, leaderHW: Long): Partition = {
-    val partition = new Partition(topic, partitionId, time)
-    val leaderReplica = new Replica(leaderId, partition, topic, time, Some(leaderHW), Some(localLog))
+  private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
+                                               localLog: Log): Partition = {
+    val leaderId=config.brokerId
+    val replicaManager = new ReplicaManager(config, time, null, null, null)
+    val partition = replicaManager.getOrCreatePartition(topic, partitionId)
+    val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
 
     val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
-    partition.assignedReplicas(Some(allReplicas.toSet))
+    allReplicas.foreach(r => partition.addReplicaIfNotExists(r))
     // set in sync replicas for this partition to all the assigned replicas
     partition.inSyncReplicas = allReplicas.toSet
     // set the leader and its hw and the hw update time
-    partition.leaderId(Some(leaderId))
-    partition.leaderHW(Some(leaderHW))
+    partition.leaderReplicaIdOpt = Some(leaderId)
     partition
   }
 
-  private def getLogWithHW(hw: Long): Log = {
+  private def getLogWithLogEndOffset(logEndOffset: Long, expectedCalls: Int): Log = {
     val log1 = EasyMock.createMock(classOf[kafka.log.Log])
-    EasyMock.expect(log1.logEndOffset).andReturn(hw).times(6)
+    EasyMock.expect(log1.logEndOffset).andReturn(logEndOffset).times(expectedCalls)
     EasyMock.replay(log1)
 
     log1
@@ -178,7 +103,7 @@ class ISRExpirationTest extends JUnit3Su
 
   private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {
     configs.filter(_.brokerId != leaderId).map { config =>
-      new Replica(config.brokerId, partition, topic, time)
+      new Replica(config.brokerId, partition, time)
     }
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Tue Aug 21 17:17:29 2012
@@ -8,7 +8,6 @@ import kafka.utils.{Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.message.Message
 import kafka.producer.{ProducerConfig, ProducerData, Producer}
-import org.junit.Test
 
 class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -41,7 +40,6 @@ class LogRecoveryTest extends JUnit3Suit
   var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDir)
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
-  @Test
   def testHWCheckpointNoFailuresSingleLogSegment {
     // start both servers
     server1 = TestUtils.createServer(configProps1)
@@ -63,15 +61,18 @@ class LogRecoveryTest extends JUnit3Suit
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
 
     sendMessages(2)
-    // don't wait for follower to read the leader's hw
-    // shutdown the servers to allow the hw to be checkpointed
-    servers.map(server => server.shutdown())
+
+    // give some time for the follower 1 to record leader HW of 60
+    assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() =>
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark == 60L, 1000))
+
+    servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
     producer.close()
     val leaderHW = hwFile1.read(topic, 0)
     assertEquals(60L, leaderHW)
     val followerHW = hwFile2.read(topic, 0)
-    assertEquals(30L, followerHW)
-    servers.map(server => Utils.rm(server.config.logDir))
+    assertEquals(60L, followerHW)
+    servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDir)})
   }
 
   def testHWCheckpointWithFailuresSingleLogSegment {
@@ -124,13 +125,13 @@ class LogRecoveryTest extends JUnit3Suit
     sendMessages()
     // give some time for follower 1 to record leader HW of 60
     assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() =>
-      server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000))
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark == 60L, 1000))
     // shutdown the servers to allow the hw to be checkpointed
-    servers.map(server => server.shutdown())
+    servers.foreach(server => server.shutdown())
     producer.close()
     assertEquals(60L, hwFile1.read(topic, 0))
     assertEquals(60L, hwFile2.read(topic, 0))
-    servers.map(server => Utils.rm(server.config.logDir))
+    servers.foreach(server => Utils.rm(server.config.logDir))
   }
 
   def testHWCheckpointNoFailuresMultipleLogSegments {
@@ -166,15 +167,15 @@ class LogRecoveryTest extends JUnit3Suit
     sendMessages(20)
     // give some time for follower 1 to record leader HW of 600
     assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() =>
-      server2.getReplica(topic, 0).get.highWatermark() == 600L, 1000))
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark == 600L, 1000))
     // shutdown the servers to allow the hw to be checkpointed
-    servers.map(server => server.shutdown())
+    servers.foreach(server => server.shutdown())
     producer.close()
     val leaderHW = hwFile1.read(topic, 0)
     assertEquals(600L, leaderHW)
     val followerHW = hwFile2.read(topic, 0)
     assertEquals(600L, followerHW)
-    servers.map(server => Utils.rm(server.config.logDir))
+    servers.foreach(server => Utils.rm(server.config.logDir))
   }
 
   def testHWCheckpointWithFailuresMultipleLogSegments {
@@ -211,7 +212,7 @@ class LogRecoveryTest extends JUnit3Suit
     sendMessages(2)
     // allow some time for the follower to get the leader HW
     assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() =>
-      server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000))
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark == 60L, 1000))
     // kill the server hosting the preferred replica
     server1.shutdown()
     server2.shutdown()
@@ -234,13 +235,13 @@ class LogRecoveryTest extends JUnit3Suit
     sendMessages(2)
     // allow some time for the follower to get the leader HW
     assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() =>
-      server1.getReplica(topic, 0).get.highWatermark() == 120L, 1000))
+      server1.replicaManager.getReplica(topic, 0).get.highWatermark == 120L, 1000))
     // shutdown the servers to allow the hw to be checkpointed
-    servers.map(server => server.shutdown())
+    servers.foreach(server => server.shutdown())
     producer.close()
     assertEquals(120L, hwFile1.read(topic, 0))
     assertEquals(120L, hwFile2.read(topic, 0))
-    servers.map(server => Utils.rm(server.config.logDir))
+    servers.foreach(server => Utils.rm(server.config.logDir))
   }
 
   private def sendMessages(numMessages: Int = 1) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Tue Aug 21 17:17:29 2012
@@ -456,6 +456,19 @@ object TestUtils extends Logging {
     // should never hit here
     throw new RuntimeException("unexpected error")
   }
+
+  def isLeaderLocalOnBroker(topic: String, partitionId: Int, server: KafkaServer): Boolean = {
+    val partitionOpt = server.replicaManager.getPartition(topic, partitionId)
+    partitionOpt match {
+      case None => false
+      case Some(partition) =>
+        val replicaOpt = partition.leaderReplicaIfLocal
+        replicaOpt match {
+          case None => false
+          case Some(_) => true
+        }
+    }
+  }
 }
 
 object ControllerTestUtils{

Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh Tue Aug 21 17:17:29 2012
@@ -150,7 +150,7 @@ cleanup() {
 get_leader_brokerid() {
     log_line=`grep -i -h 'completed the leader state transition' ${base_dir}/kafka_server_*.log | sort | tail -1`
     info "found the log line: $log_line"
-    broker_id=`echo $log_line | sed s'/^.*INFO Replica Manager on Broker //g' | awk -F ',' '{print $1}'`
+    broker_id=`echo $log_line | sed s'/^.*INFO Replica Manager on Broker //g' | awk -F ':' '{print $1}'`
 
     return $broker_id
 }