You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/04/08 04:00:19 UTC
svn commit: r1310937 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/server/ test/resources/ test/scala/unit/kafka/log/
test/scala/unit/kafka/producer/ test/scala/unit/kafka/server/
test/scala/unit/kafka/zk/
Author: nehanarkhede
Date: Sun Apr 8 02:00:19 2012
New Revision: 1310937
URL: http://svn.apache.org/viewvc?rev=1310937&view=rev
Log:
KAFKA-320 testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException; patched by nehanarkhede; reviewed by junrao and prashanth menon
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/trunk/core/src/test/resources/log4j.properties
incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Sun Apr 8 02:00:19 2012
@@ -30,9 +30,9 @@ import java.io.File
*/
class KafkaServer(val config: KafkaConfig) extends Logging {
val CLEAN_SHUTDOWN_FILE = ".kafka_cleanshutdown"
- private val isShuttingDown = new AtomicBoolean(false)
-
- private val shutdownLatch = new CountDownLatch(1)
+ private var isShuttingDown = new AtomicBoolean(false)
+ private var shutdownLatch = new CountDownLatch(1)
+
private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
var socketServer: SocketServer = null
@@ -47,6 +47,8 @@ class KafkaServer(val config: KafkaConfi
*/
def startup() {
info("Starting Kafka server...")
+ isShuttingDown = new AtomicBoolean(false)
+ shutdownLatch = new CountDownLatch(1)
var needRecovery = true
val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
if (cleanShutDownFile.exists) {
Modified: incubator/kafka/trunk/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/resources/log4j.properties?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/trunk/core/src/test/resources/log4j.properties Sun Apr 8 02:00:19 2012
@@ -4,9 +4,9 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Sun Apr 8 02:00:19 2012
@@ -42,8 +42,6 @@ class LogOffsetTest extends JUnitSuite {
val brokerPort: Int = 9099
var simpleConsumer: SimpleConsumer = null
- private val logger = Logger.getLogger(classOf[LogOffsetTest])
-
@Before
def setUp() {
val config: Properties = createBrokerConfig(1, brokerPort)
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Sun Apr 8 02:00:19 2012
@@ -187,6 +187,8 @@ class ProducerTest extends JUnitSuite {
Assert.fail("Should fail with InvalidPartitionException")
}catch {
case e: InvalidPartitionException => // expected, do nothing
+ }finally {
+ richProducer.close()
}
}
@@ -202,17 +204,22 @@ class ProducerTest extends JUnitSuite {
fail("Should fail with ClassCastException due to incompatible Encoder")
} catch {
case e: ClassCastException =>
+ }finally {
+ stringProducer1.close()
}
props.put("serializer.class", "kafka.serializer.StringEncoder")
val stringProducer2 = new Producer[String, String](new ProducerConfig(props))
stringProducer2.send(new ProducerData[String, String](topic, "test", Array("test")))
+ stringProducer2.close()
val messageProducer1 = new Producer[String, Message](config)
try {
messageProducer1.send(new ProducerData[String, Message](topic, "test", Array(new Message("test".getBytes))))
} catch {
case e: ClassCastException => fail("Should not fail with ClassCastException due to default Encoder")
+ }finally {
+ messageProducer1.close()
}
}
@@ -423,8 +430,9 @@ class ProducerTest extends JUnitSuite {
Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message)
} catch {
case e: Exception => fail("Not expected", e)
+ }finally {
+ producer.close
}
- producer.close
}
@Test
@@ -459,8 +467,9 @@ class ProducerTest extends JUnitSuite {
Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
} catch {
case e: Exception => fail("Not expected")
+ }finally {
+ producer.close
}
- producer.close
}
@Test
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Sun Apr 8 02:00:19 2012
@@ -66,6 +66,7 @@ class ServerShutdownTest extends JUnitSu
server.shutdown()
val cleanShutDownFile = new File(new File(config.logDir), server.CLEAN_SHUTDOWN_FILE)
assertTrue(cleanShutDownFile.exists)
+ producer.close()
}
@@ -103,6 +104,7 @@ class ServerShutdownTest extends JUnitSu
server.shutdown()
Utils.rm(server.config.logDir)
+ producer.close()
}
}
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Sun Apr 8 02:00:19 2012
@@ -20,9 +20,8 @@ package kafka.zk
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxn
import kafka.utils.TestUtils
-import org.I0Itec.zkclient.ZkClient
import java.net.InetSocketAddress
-import kafka.utils.{Utils, ZKStringSerializer}
+import kafka.utils.Utils
class EmbeddedZookeeper(val connectString: String) {
val snapshotDir = TestUtils.tempDir()
@@ -31,8 +30,6 @@ class EmbeddedZookeeper(val connectStrin
val port = connectString.split(":")(1).toInt
val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
factory.startup(zookeeper)
- val client = new ZkClient(connectString)
- client.setZkSerializer(ZKStringSerializer)
def shutdown() {
factory.shutdown()
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala Sun Apr 8 02:00:19 2012
@@ -40,7 +40,7 @@ class ZKLoadBalanceTest extends JUnit3Su
def testLoadBalance() {
// create the first partition
- ZkUtils.setupPartition(zookeeper.client, 400, "broker1", 1111, "topic1", 1)
+ ZkUtils.setupPartition(zkClient, 400, "broker1", 1111, "topic1", 1)
// add the first consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, firstConsumer))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, false)
@@ -74,7 +74,7 @@ class ZKLoadBalanceTest extends JUnit3Su
(300, "broker3", 1111, "topic1", 2) )
for ((brokerID, host, port, topic, nParts) <- brokers)
- ZkUtils.setupPartition(zookeeper.client, brokerID, host, port, topic, nParts)
+ ZkUtils.setupPartition(zkClient, brokerID, host, port, topic, nParts)
// wait a bit to make sure rebalancing logic is triggered
@@ -91,7 +91,7 @@ class ZKLoadBalanceTest extends JUnit3Su
{
// now delete a partition
- ZkUtils.deletePartition(zookeeper.client, 400, "topic1")
+ ZkUtils.deletePartition(zkClient, 400, "topic1")
// wait a bit to make sure rebalancing logic is triggered
Thread.sleep(500)
@@ -110,11 +110,11 @@ class ZKLoadBalanceTest extends JUnit3Su
private def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
import scala.collection.JavaConversions
- val children = zookeeper.client.getChildren(path)
+ val children = zkClient.getChildren(path)
Collections.sort(children)
val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
childrenAsSeq.map(partition =>
- (partition, zookeeper.client.readData(path + "/" + partition).asInstanceOf[String]))
+ (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
}
private def checkSetEqual(actual : Seq[Tuple2[String,String]], expected : Seq[Tuple2[String,String]]) {
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala?rev=1310937&r1=1310936&r2=1310937&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala Sun Apr 8 02:00:19 2012
@@ -18,18 +18,24 @@
package kafka.zk
import org.scalatest.junit.JUnit3Suite
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZKStringSerializer
trait ZooKeeperTestHarness extends JUnit3Suite {
val zkConnect: String
var zookeeper: EmbeddedZookeeper = null
+ var zkClient: ZkClient = null
override def setUp() {
zookeeper = new EmbeddedZookeeper(zkConnect)
+ zkClient = new ZkClient(zookeeper.connectString)
+ zkClient.setZkSerializer(ZKStringSerializer)
super.setUp
}
override def tearDown() {
super.tearDown
+ zkClient.close()
zookeeper.shutdown()
}