You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/04/08 04:00:19 UTC

svn commit: r1310937 - in /incubator/kafka/trunk/core/src: main/scala/kafka/server/ test/resources/ test/scala/unit/kafka/log/ test/scala/unit/kafka/producer/ test/scala/unit/kafka/server/ test/scala/unit/kafka/zk/

Author: nehanarkhede
Date: Sun Apr  8 02:00:19 2012
New Revision: 1310937

URL: http://svn.apache.org/viewvc?rev=1310937&view=rev
Log:
KAFKA-320 testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException; patched by nehanarkhede; reviewed by junrao and prashanth menon

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/trunk/core/src/test/resources/log4j.properties
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Sun Apr  8 02:00:19 2012
@@ -30,9 +30,9 @@ import java.io.File
  */
 class KafkaServer(val config: KafkaConfig) extends Logging {
   val CLEAN_SHUTDOWN_FILE = ".kafka_cleanshutdown"
-  private val isShuttingDown = new AtomicBoolean(false)
-  
-  private val shutdownLatch = new CountDownLatch(1)
+  private var isShuttingDown = new AtomicBoolean(false)
+  private var shutdownLatch = new CountDownLatch(1)
+
   private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
   
   var socketServer: SocketServer = null
@@ -47,6 +47,8 @@ class KafkaServer(val config: KafkaConfi
    */
   def startup() {
     info("Starting Kafka server...")
+    isShuttingDown = new AtomicBoolean(false)
+    shutdownLatch = new CountDownLatch(1)
     var needRecovery = true
     val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
     if (cleanShutDownFile.exists) {

Modified: incubator/kafka/trunk/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/resources/log4j.properties?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/trunk/core/src/test/resources/log4j.properties Sun Apr  8 02:00:19 2012
@@ -4,9 +4,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Sun Apr  8 02:00:19 2012
@@ -42,8 +42,6 @@ class LogOffsetTest extends JUnitSuite {
   val brokerPort: Int = 9099
   var simpleConsumer: SimpleConsumer = null
 
-  private val logger = Logger.getLogger(classOf[LogOffsetTest])
-  
   @Before
   def setUp() {
     val config: Properties = createBrokerConfig(1, brokerPort)

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Sun Apr  8 02:00:19 2012
@@ -187,6 +187,8 @@ class ProducerTest extends JUnitSuite {
       Assert.fail("Should fail with InvalidPartitionException")
     }catch {
       case e: InvalidPartitionException => // expected, do nothing
+    }finally {
+      richProducer.close()
     }
   }
 
@@ -202,17 +204,22 @@ class ProducerTest extends JUnitSuite {
       fail("Should fail with ClassCastException due to incompatible Encoder")
     } catch {
       case e: ClassCastException =>
+    }finally {
+      stringProducer1.close()
     }
 
     props.put("serializer.class", "kafka.serializer.StringEncoder")
     val stringProducer2 = new Producer[String, String](new ProducerConfig(props))
     stringProducer2.send(new ProducerData[String, String](topic, "test", Array("test")))
+    stringProducer2.close()
 
     val messageProducer1 = new Producer[String, Message](config)
     try {
       messageProducer1.send(new ProducerData[String, Message](topic, "test", Array(new Message("test".getBytes))))
     } catch {
       case e: ClassCastException => fail("Should not fail with ClassCastException due to default Encoder")
+    }finally {
+      messageProducer1.close()
     }
   }
 
@@ -423,8 +430,9 @@ class ProducerTest extends JUnitSuite {
       Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message)
     } catch {
       case e: Exception => fail("Not expected", e)
+    }finally {
+      producer.close
     }
-    producer.close
   }
 
   @Test
@@ -459,8 +467,9 @@ class ProducerTest extends JUnitSuite {
       Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
     } catch {
       case e: Exception => fail("Not expected")
+    }finally {
+      producer.close
     }
-    producer.close
   }
 
   @Test

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Sun Apr  8 02:00:19 2012
@@ -66,6 +66,7 @@ class ServerShutdownTest extends JUnitSu
       server.shutdown()
       val cleanShutDownFile = new File(new File(config.logDir), server.CLEAN_SHUTDOWN_FILE)
       assertTrue(cleanShutDownFile.exists)
+      producer.close()
     }
 
 
@@ -103,6 +104,7 @@ class ServerShutdownTest extends JUnitSu
 
       server.shutdown()
       Utils.rm(server.config.logDir)
+      producer.close()
     }
 
   }

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Sun Apr  8 02:00:19 2012
@@ -20,9 +20,8 @@ package kafka.zk
 import org.apache.zookeeper.server.ZooKeeperServer
 import org.apache.zookeeper.server.NIOServerCnxn
 import kafka.utils.TestUtils
-import org.I0Itec.zkclient.ZkClient
 import java.net.InetSocketAddress
-import kafka.utils.{Utils, ZKStringSerializer}
+import kafka.utils.Utils
 
 class EmbeddedZookeeper(val connectString: String) {
   val snapshotDir = TestUtils.tempDir()
@@ -31,8 +30,6 @@ class EmbeddedZookeeper(val connectStrin
   val port = connectString.split(":")(1).toInt
   val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
   factory.startup(zookeeper)
-  val client = new ZkClient(connectString)
-  client.setZkSerializer(ZKStringSerializer)
 
   def shutdown() {
     factory.shutdown()

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala Sun Apr  8 02:00:19 2012
@@ -40,7 +40,7 @@ class ZKLoadBalanceTest extends JUnit3Su
 
   def testLoadBalance() {
     // create the first partition
-    ZkUtils.setupPartition(zookeeper.client, 400, "broker1", 1111, "topic1", 1)
+    ZkUtils.setupPartition(zkClient, 400, "broker1", 1111, "topic1", 1)
     // add the first consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, firstConsumer))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, false)
@@ -74,7 +74,7 @@ class ZKLoadBalanceTest extends JUnit3Su
         (300, "broker3", 1111, "topic1", 2) )
 
       for ((brokerID, host, port, topic, nParts) <- brokers)
-        ZkUtils.setupPartition(zookeeper.client, brokerID, host, port, topic, nParts)
+        ZkUtils.setupPartition(zkClient, brokerID, host, port, topic, nParts)
 
 
       // wait a bit to make sure rebalancing logic is triggered
@@ -91,7 +91,7 @@ class ZKLoadBalanceTest extends JUnit3Su
 
     {
       // now delete a partition
-      ZkUtils.deletePartition(zookeeper.client, 400, "topic1")
+      ZkUtils.deletePartition(zkClient, 400, "topic1")
 
       // wait a bit to make sure rebalancing logic is triggered
       Thread.sleep(500)
@@ -110,11 +110,11 @@ class ZKLoadBalanceTest extends JUnit3Su
 
   private def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
     import scala.collection.JavaConversions
-    val children = zookeeper.client.getChildren(path)
+    val children = zkClient.getChildren(path)
     Collections.sort(children)
     val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
     childrenAsSeq.map(partition =>
-      (partition, zookeeper.client.readData(path + "/" + partition).asInstanceOf[String]))
+      (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
   }
 
   private def checkSetEqual(actual : Seq[Tuple2[String,String]], expected : Seq[Tuple2[String,String]]) {

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala Sun Apr  8 02:00:19 2012
@@ -18,18 +18,24 @@
 package kafka.zk
 
 import org.scalatest.junit.JUnit3Suite
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZKStringSerializer
 
 trait ZooKeeperTestHarness extends JUnit3Suite {
   val zkConnect: String
   var zookeeper: EmbeddedZookeeper = null
+  var zkClient: ZkClient = null
 
   override def setUp() {
     zookeeper = new EmbeddedZookeeper(zkConnect)
+    zkClient = new ZkClient(zookeeper.connectString)
+    zkClient.setZkSerializer(ZKStringSerializer)
     super.setUp
   }
 
   override def tearDown() {
     super.tearDown
+    zkClient.close()
     zookeeper.shutdown()
   }