You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/04/07 04:12:27 UTC

svn commit: r1310661 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/producer/ main/scala/kafka/server/ main/scala/kafka/utils/ test/resources/ test/scala/unit/kafka/admin/ test/scala/unit/kafka/consumer/ test/scala/unit/kafka/integration...

Author: nehanarkhede
Date: Sat Apr  7 02:12:26 2012
New Revision: 1310661

URL: http://svn.apache.org/viewvc?rev=1310661&view=rev
Log:
KAFKA-320 testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException; patched by nehanarkhede; reviewed by junrao and prashanth menon

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Sat Apr  7 02:12:26 2012
@@ -88,6 +88,7 @@ class ProducerPool(val config: ProducerC
       val iter = syncProducers.values.iterator
       while(iter.hasNext)
         iter.next.close
+      zkClient.close()
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Sat Apr  7 02:12:26 2012
@@ -17,13 +17,13 @@
 
 package kafka.server
 
-import java.util.concurrent._
-import java.util.concurrent.atomic._
 import java.io.File
 import kafka.network.{SocketServerStats, SocketServer}
 import kafka.log.LogManager
 import kafka.utils._
 import kafka.cluster.Replica
+import java.util.concurrent._
+import atomic.AtomicBoolean
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -32,8 +32,8 @@ import kafka.cluster.Replica
 class KafkaServer(val config: KafkaConfig) extends Logging {
 
   val CleanShutdownFile = ".kafka_cleanshutdown"
-  private val isShuttingDown = new AtomicBoolean(false)  
-  private val shutdownLatch = new CountDownLatch(1)
+  private var isShuttingDown = new AtomicBoolean(false)
+  private var shutdownLatch = new CountDownLatch(1)
   private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
@@ -47,6 +47,8 @@ class KafkaServer(val config: KafkaConfi
    */
   def startup() {
     info("Starting Kafka server...")
+    isShuttingDown = new AtomicBoolean(false)
+    shutdownLatch = new CountDownLatch(1)
     var needRecovery = true
     val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
     if (cleanShutDownFile.exists) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Sat Apr  7 02:12:26 2012
@@ -29,7 +29,6 @@ import scala.collection.mutable
 import kafka.message.{NoCompressionCodec, CompressionCodec}
 import org.I0Itec.zkclient.ZkClient
 import java.util.{Random, Properties}
-import kafka.network.{BoundedByteBufferReceive, Receive, BoundedByteBufferSend, Request}
 
 /**
  * Helper functions!

Modified: incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties Sat Apr  7 02:12:26 2012
@@ -4,9 +4,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,5 +21,5 @@ log4j.appender.stdout.layout.ConversionP
 log4j.logger.kafka=ERROR
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=ERROR
-log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
+log4j.logger.org.apache.zookeeper=WARN

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Sat Apr  7 02:12:26 2012
@@ -134,18 +134,18 @@ class AdminTest extends JUnit3Suite with
       List("1", "2", "3"),
       List("1", "3", "4")      
       )
-    TestUtils.createBrokersInZk(zookeeper.client, List(0, 1, 2, 3, 4))
+    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
 
     val topic = "test"
     // create the topic
-    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
-    val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zookeeper.client).head
+    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
                                   .get.partitionsMetadata.map(p => p.replicas)
     val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
     expectedReplicaAssignment.toList.zip(actualReplicaList).foreach(l => assertEquals(l._1, l._2))
 
     try {
-      AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
+      AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
       fail("shouldn't be able to create a topic already exists")
     }
     catch {
@@ -161,10 +161,10 @@ class AdminTest extends JUnit3Suite with
       List("1", "2", "3")
     )
     val topic = "auto-topic"
-    TestUtils.createBrokersInZk(zookeeper.client, List(0, 1, 2, 3))
-    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
+    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
+    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
 
-    val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zookeeper.client).head
+    val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
     newTopicMetadata match {
       case Some(metadata) => assertEquals(topic, metadata.topic)
         assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Sat Apr  7 02:12:26 2012
@@ -52,12 +52,14 @@ class ZookeeperConsumerConnectorTest ext
   val consumer2 = "consumer2"
   val consumer3 = "consumer3"
   val nMessages = 2
-  var zkClient: ZkClient = null
 
   override def setUp() {
     super.setUp()
     dirs = new ZKGroupTopicDirs(group, topic)
-    zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
+  }
+
+  override def tearDown() {
+    super.tearDown()
   }
 
   def testBasic() {
@@ -98,8 +100,8 @@ class ZookeeperConsumerConnectorTest ext
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1.size, receivedMessages1.size)
@@ -124,8 +126,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
     val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -149,8 +151,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
     val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -178,8 +180,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
@@ -209,8 +211,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -234,8 +236,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -438,11 +440,11 @@ class ZookeeperConsumerConnectorTest ext
 
   def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
     import scala.collection.JavaConversions
-    val children = zookeeper.client.getChildren(path)
+    val children = zkClient.getChildren(path)
     Collections.sort(children)
     val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
     childrenAsSeq.map(partition =>
-      (partition, zookeeper.client.readData(path + "/" + partition).asInstanceOf[String]))
+      (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
   }
 
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Sat Apr  7 02:12:26 2012
@@ -28,8 +28,8 @@ import kafka.server._
 import org.scalatest.junit.JUnit3Suite
 import kafka.integration.KafkaServerTestHarness
 import kafka.producer.{ProducerData, Producer}
-import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
+import kafka.utils.TestUtils
 
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
@@ -67,7 +67,7 @@ class FetcherTest extends JUnit3Suite wi
   def testFetcher() {
     val perNode = 2
     var count = sendMessages(perNode)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
     fetch(count)
     Thread.sleep(100)
     assertQueueEmpty()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Sat Apr  7 02:12:26 2012
@@ -22,26 +22,22 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.CreateTopicCommand
 import java.nio.ByteBuffer
 import kafka.log.LogManager
-import kafka.utils.TestUtils
 import junit.framework.Assert._
-import org.I0Itec.zkclient.ZkClient
-import TestUtils._
 import org.easymock.EasyMock
 import kafka.network.BoundedByteBufferReceive
 import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
 import kafka.cluster.Broker
 import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.utils.TestUtils._
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
   val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
-  var zkClient: ZkClient = null
   var brokers: Seq[Broker] = null
 
   override def setUp() {
     super.setUp()
-    zkClient = zookeeper.client
-    // create brokers in zookeeper
     brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Sat Apr  7 02:12:26 2012
@@ -26,8 +26,8 @@ import kafka.consumer.{ConsumerConfig, K
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.javaapi.producer.{ProducerData, Producer}
-import kafka.utils.{Utils, Logging, TestUtils}
 import kafka.utils.TestUtils._
+import kafka.utils.{Utils, Logging, TestUtils}
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
@@ -53,8 +53,8 @@ class ZookeeperConsumerConnectorTest ext
     // send some messages to each broker
     val sentMessages1 = sendMessages(nMessages, "batch1")
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Sat Apr  7 02:12:26 2012
@@ -22,10 +22,10 @@ import junit.framework.Assert._
 import org.junit.Test
 import kafka.common.OffsetOutOfRangeException
 import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{TestZKUtils, Utils, MockTime, TestUtils}
 import org.scalatest.junit.JUnit3Suite
 import kafka.admin.CreateTopicCommand
 import kafka.server.KafkaConfig
+import kafka.utils._
 
 class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -48,10 +48,10 @@ class LogManagerTest extends JUnit3Suite
     logManager.startup
     logDir = logManager.logDir
 
-    TestUtils.createBrokersInZk(zookeeper.client, List(config.brokerId))
+    TestUtils.createBrokersInZk(zkClient, List(config.brokerId))
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zookeeper.client, name, 3, 1, "0,0,0")
+    CreateTopicCommand.createTopic(zkClient, name, 3, 1, "0,0,0")
   }
 
   override def tearDown() {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Sat Apr  7 02:12:26 2012
@@ -26,7 +26,6 @@ import collection.mutable.WrappedArray
 import kafka.consumer.SimpleConsumer
 import org.junit.{After, Before, Test}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
-import org.apache.log4j._
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import kafka.admin.CreateTopicCommand
@@ -44,8 +43,6 @@ class LogOffsetTest extends JUnit3Suite 
   val brokerPort: Int = 9099
   var simpleConsumer: SimpleConsumer = null
 
-  private val logger = Logger.getLogger(classOf[LogOffsetTest])
-  
   @Before
   override def setUp() {
     super.setUp()
@@ -99,7 +96,7 @@ class LogOffsetTest extends JUnit3Suite 
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "1")
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topic, part)
@@ -137,7 +134,7 @@ class LogOffsetTest extends JUnit3Suite 
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "1")
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
 
     var offsetChanged = false
     for(i <- 1 to 14) {
@@ -158,7 +155,7 @@ class LogOffsetTest extends JUnit3Suite 
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zookeeper.client, topic, 3, 1, "1,1,1")
+    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topic, part)
@@ -185,7 +182,7 @@ class LogOffsetTest extends JUnit3Suite 
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zookeeper.client, topic, 3, 1, "1,1,1")
+    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topic, part)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Sat Apr  7 02:12:26 2012
@@ -30,23 +30,19 @@ import kafka.producer.async._
 import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils._
-import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import collection.Map
 import collection.mutable.ListBuffer
-import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
 
 class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
   val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
-  var zkClient: ZkClient = null
   var brokers: Seq[Broker] = null
 
   override def setUp() {
     super.setUp()
-    zkClient = zookeeper.client
-    // create brokers in zookeeper
     brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Sat Apr  7 02:12:26 2012
@@ -27,10 +27,9 @@ import kafka.message.Message
 import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.log4j.{Level, Logger}
-import org.I0Itec.zkclient.ZkClient
 import org.junit.Assert._
 import org.junit.Test
-import kafka.utils.{SystemTime, TestZKUtils, Utils, TestUtils}
+import kafka.utils._
 
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   private val brokerId1 = 0
@@ -42,13 +41,10 @@ class ProducerTest extends JUnit3Suite w
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
   private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
-  private var zkClient: ZkClient = null
 
   override def setUp() {
     super.setUp()
     // set up 2 brokers with 4 partitions each
-    zkClient = zookeeper.client
-
     val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
     val config1 = new KafkaConfig(props1) {
       override val numPartitions = 4
@@ -166,7 +162,7 @@ class ProducerTest extends JUnit3Suite w
 
     // restart server 1
     server1.startup()
-    Thread.sleep(500)
+    Thread.sleep(100)
 
     try {
       // cross check if broker 1 got the messages

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Sat Apr  7 02:12:26 2012
@@ -116,10 +116,10 @@ class SyncProducerTest extends JUnit3Sui
     response.offsets.foreach(Assert.assertEquals(-1L, _))
 
     // #2 - test that we get correct offsets when partition is owner by broker
-    val zkClient = zookeeper.client
     CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
     CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
 
+    Thread.sleep(500)
     val response2 = producer.send(request)
     Assert.assertEquals(request.correlationId, response2.correlationId)
     Assert.assertEquals(response2.errors.length, response2.offsets.length)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Sat Apr  7 02:12:26 2012
@@ -20,10 +20,9 @@ package kafka.server
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.CreateTopicCommand
-import org.I0Itec.zkclient.ZkClient
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
-import kafka.utils.{ZKStringSerializer, Utils, TestUtils}
+import kafka.utils.{Utils, TestUtils}
 
 class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -37,13 +36,10 @@ class LeaderElectionTest extends JUnit3S
   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
 
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
-  var zkClient: ZkClient = null
 
   override def setUp() {
     super.setUp()
 
-    zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
-
     // start both servers
     val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
     val server2 = TestUtils.createServer(new KafkaConfig(configProps2))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Sat Apr  7 02:12:26 2012
@@ -48,7 +48,7 @@ class ServerShutdownTest extends JUnit3S
       server.startup()
 
       // create topic
-      CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "0")
+      CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
 
       val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
 
@@ -60,6 +60,7 @@ class ServerShutdownTest extends JUnit3S
       server.shutdown()
       val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile)
       assertTrue(cleanShutDownFile.exists)
+      producer.close()
     }
 
 
@@ -73,7 +74,7 @@ class ServerShutdownTest extends JUnit3S
       val server = new KafkaServer(config)
       server.startup()
 
-      waitUntilLeaderIsElected(zookeeper.client, topic, 0, 1000)
+      waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
 
       var fetchedMessage: ByteBufferMessageSet = null
       while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
@@ -97,6 +98,7 @@ class ServerShutdownTest extends JUnit3S
 
       server.shutdown()
       Utils.rm(server.config.logDir)
+      producer.close()
     }
 
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Sat Apr  7 02:12:26 2012
@@ -20,9 +20,8 @@ package kafka.zk
 import org.apache.zookeeper.server.ZooKeeperServer
 import org.apache.zookeeper.server.NIOServerCnxn
 import kafka.utils.TestUtils
-import org.I0Itec.zkclient.ZkClient
 import java.net.InetSocketAddress
-import kafka.utils.{Utils, ZKStringSerializer}
+import kafka.utils.Utils
 
 class EmbeddedZookeeper(val connectString: String) {
   val snapshotDir = TestUtils.tempDir()
@@ -32,8 +31,6 @@ class EmbeddedZookeeper(val connectStrin
   val port = connectString.split(":")(1).toInt
   val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
   factory.startup(zookeeper)
-  val client = new ZkClient(connectString)
-  client.setZkSerializer(ZKStringSerializer)
 
   def shutdown() {
     factory.shutdown()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala Sat Apr  7 02:12:26 2012
@@ -18,19 +18,24 @@
 package kafka.zk
 
 import org.scalatest.junit.JUnit3Suite
-import kafka.utils.TestZKUtils
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZKStringSerializer, TestZKUtils}
 
 trait ZooKeeperTestHarness extends JUnit3Suite {
   val zkConnect: String = TestZKUtils.zookeeperConnect
   var zookeeper: EmbeddedZookeeper = null
+  var zkClient: ZkClient = null
 
   override def setUp() {
     zookeeper = new EmbeddedZookeeper(zkConnect)
+    zkClient = new ZkClient(zookeeper.connectString)
+    zkClient.setZkSerializer(ZKStringSerializer)
     super.setUp
   }
 
   override def tearDown() {
     super.tearDown
+    zkClient.close()
     zookeeper.shutdown()
   }