You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/04/07 04:12:27 UTC
svn commit: r1310661 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/producer/ main/scala/kafka/server/ main/scala/kafka/utils/
test/resources/ test/scala/unit/kafka/admin/
test/scala/unit/kafka/consumer/ test/scala/unit/kafka/integration...
Author: nehanarkhede
Date: Sat Apr 7 02:12:26 2012
New Revision: 1310661
URL: http://svn.apache.org/viewvc?rev=1310661&view=rev
Log:
KAFKA-320 testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException; patched by nehanarkhede; reviewed by junrao and prashanth menon
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Sat Apr 7 02:12:26 2012
@@ -88,6 +88,7 @@ class ProducerPool(val config: ProducerC
val iter = syncProducers.values.iterator
while(iter.hasNext)
iter.next.close
+ zkClient.close()
}
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Sat Apr 7 02:12:26 2012
@@ -17,13 +17,13 @@
package kafka.server
-import java.util.concurrent._
-import java.util.concurrent.atomic._
import java.io.File
import kafka.network.{SocketServerStats, SocketServer}
import kafka.log.LogManager
import kafka.utils._
import kafka.cluster.Replica
+import java.util.concurrent._
+import atomic.AtomicBoolean
/**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -32,8 +32,8 @@ import kafka.cluster.Replica
class KafkaServer(val config: KafkaConfig) extends Logging {
val CleanShutdownFile = ".kafka_cleanshutdown"
- private val isShuttingDown = new AtomicBoolean(false)
- private val shutdownLatch = new CountDownLatch(1)
+ private var isShuttingDown = new AtomicBoolean(false)
+ private var shutdownLatch = new CountDownLatch(1)
private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
@@ -47,6 +47,8 @@ class KafkaServer(val config: KafkaConfi
*/
def startup() {
info("Starting Kafka server...")
+ isShuttingDown = new AtomicBoolean(false)
+ shutdownLatch = new CountDownLatch(1)
var needRecovery = true
val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
if (cleanShutDownFile.exists) {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Sat Apr 7 02:12:26 2012
@@ -29,7 +29,6 @@ import scala.collection.mutable
import kafka.message.{NoCompressionCodec, CompressionCodec}
import org.I0Itec.zkclient.ZkClient
import java.util.{Random, Properties}
-import kafka.network.{BoundedByteBufferReceive, Receive, BoundedByteBufferSend, Request}
/**
* Helper functions!
Modified: incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties Sat Apr 7 02:12:26 2012
@@ -4,9 +4,9 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,5 +21,5 @@ log4j.appender.stdout.layout.ConversionP
log4j.logger.kafka=ERROR
# zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=ERROR
-log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
+log4j.logger.org.apache.zookeeper=WARN
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Sat Apr 7 02:12:26 2012
@@ -134,18 +134,18 @@ class AdminTest extends JUnit3Suite with
List("1", "2", "3"),
List("1", "3", "4")
)
- TestUtils.createBrokersInZk(zookeeper.client, List(0, 1, 2, 3, 4))
+ TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
val topic = "test"
// create the topic
- AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
- val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zookeeper.client).head
+ AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
.get.partitionsMetadata.map(p => p.replicas)
val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
expectedReplicaAssignment.toList.zip(actualReplicaList).foreach(l => assertEquals(l._1, l._2))
try {
- AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
+ AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
fail("shouldn't be able to create a topic already exists")
}
catch {
@@ -161,10 +161,10 @@ class AdminTest extends JUnit3Suite with
List("1", "2", "3")
)
val topic = "auto-topic"
- TestUtils.createBrokersInZk(zookeeper.client, List(0, 1, 2, 3))
- AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
+ TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
+ AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
- val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zookeeper.client).head
+ val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
newTopicMetadata match {
case Some(metadata) => assertEquals(topic, metadata.topic)
assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Sat Apr 7 02:12:26 2012
@@ -52,12 +52,14 @@ class ZookeeperConsumerConnectorTest ext
val consumer2 = "consumer2"
val consumer3 = "consumer3"
val nMessages = 2
- var zkClient: ZkClient = null
override def setUp() {
super.setUp()
dirs = new ZKGroupTopicDirs(group, topic)
- zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
+ }
+
+ override def tearDown() {
+ super.tearDown()
}
def testBasic() {
@@ -98,8 +100,8 @@ class ZookeeperConsumerConnectorTest ext
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
- waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
- waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
assertEquals(sentMessages1.size, receivedMessages1.size)
@@ -124,8 +126,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
- waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -149,8 +151,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
- waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -178,8 +180,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
- waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
// create a consumer
val consumerConfig1 = new ConsumerConfig(
@@ -209,8 +211,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
- waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -234,8 +236,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
- waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -438,11 +440,11 @@ class ZookeeperConsumerConnectorTest ext
def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
import scala.collection.JavaConversions
- val children = zookeeper.client.getChildren(path)
+ val children = zkClient.getChildren(path)
Collections.sort(children)
val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
childrenAsSeq.map(partition =>
- (partition, zookeeper.client.readData(path + "/" + partition).asInstanceOf[String]))
+ (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Sat Apr 7 02:12:26 2012
@@ -28,8 +28,8 @@ import kafka.server._
import org.scalatest.junit.JUnit3Suite
import kafka.integration.KafkaServerTestHarness
import kafka.producer.{ProducerData, Producer}
-import kafka.utils.TestUtils
import kafka.utils.TestUtils._
+import kafka.utils.TestUtils
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -67,7 +67,7 @@ class FetcherTest extends JUnit3Suite wi
def testFetcher() {
val perNode = 2
var count = sendMessages(perNode)
- waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
fetch(count)
Thread.sleep(100)
assertQueueEmpty()
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Sat Apr 7 02:12:26 2012
@@ -22,26 +22,22 @@ import kafka.zk.ZooKeeperTestHarness
import kafka.admin.CreateTopicCommand
import java.nio.ByteBuffer
import kafka.log.LogManager
-import kafka.utils.TestUtils
import junit.framework.Assert._
-import org.I0Itec.zkclient.ZkClient
-import TestUtils._
import org.easymock.EasyMock
import kafka.network.BoundedByteBufferReceive
import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
import kafka.cluster.Broker
import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.utils.TestUtils._
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1)
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
- var zkClient: ZkClient = null
var brokers: Seq[Broker] = null
override def setUp() {
super.setUp()
- zkClient = zookeeper.client
- // create brokers in zookeeper
brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Sat Apr 7 02:12:26 2012
@@ -26,8 +26,8 @@ import kafka.consumer.{ConsumerConfig, K
import org.apache.log4j.{Level, Logger}
import kafka.message._
import kafka.javaapi.producer.{ProducerData, Producer}
-import kafka.utils.{Utils, Logging, TestUtils}
import kafka.utils.TestUtils._
+import kafka.utils.{Utils, Logging, TestUtils}
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
@@ -53,8 +53,8 @@ class ZookeeperConsumerConnectorTest ext
// send some messages to each broker
val sentMessages1 = sendMessages(nMessages, "batch1")
- waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
- waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Sat Apr 7 02:12:26 2012
@@ -22,10 +22,10 @@ import junit.framework.Assert._
import org.junit.Test
import kafka.common.OffsetOutOfRangeException
import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{TestZKUtils, Utils, MockTime, TestUtils}
import org.scalatest.junit.JUnit3Suite
import kafka.admin.CreateTopicCommand
import kafka.server.KafkaConfig
+import kafka.utils._
class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -48,10 +48,10 @@ class LogManagerTest extends JUnit3Suite
logManager.startup
logDir = logManager.logDir
- TestUtils.createBrokersInZk(zookeeper.client, List(config.brokerId))
+ TestUtils.createBrokersInZk(zkClient, List(config.brokerId))
// setup brokers in zookeeper as owners of partitions for this test
- CreateTopicCommand.createTopic(zookeeper.client, name, 3, 1, "0,0,0")
+ CreateTopicCommand.createTopic(zkClient, name, 3, 1, "0,0,0")
}
override def tearDown() {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Sat Apr 7 02:12:26 2012
@@ -26,7 +26,6 @@ import collection.mutable.WrappedArray
import kafka.consumer.SimpleConsumer
import org.junit.{After, Before, Test}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
-import org.apache.log4j._
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import kafka.admin.CreateTopicCommand
@@ -44,8 +43,6 @@ class LogOffsetTest extends JUnit3Suite
val brokerPort: Int = 9099
var simpleConsumer: SimpleConsumer = null
- private val logger = Logger.getLogger(classOf[LogOffsetTest])
-
@Before
override def setUp() {
super.setUp()
@@ -99,7 +96,7 @@ class LogOffsetTest extends JUnit3Suite
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
- CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "1")
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(topic, part)
@@ -137,7 +134,7 @@ class LogOffsetTest extends JUnit3Suite
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
- CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "1")
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
var offsetChanged = false
for(i <- 1 to 14) {
@@ -158,7 +155,7 @@ class LogOffsetTest extends JUnit3Suite
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
- CreateTopicCommand.createTopic(zookeeper.client, topic, 3, 1, "1,1,1")
+ CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(topic, part)
@@ -185,7 +182,7 @@ class LogOffsetTest extends JUnit3Suite
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in zookeeper as owners of partitions for this test
- CreateTopicCommand.createTopic(zookeeper.client, topic, 3, 1, "1,1,1")
+ CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(topic, part)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Sat Apr 7 02:12:26 2012
@@ -30,23 +30,19 @@ import kafka.producer.async._
import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
import kafka.server.KafkaConfig
import kafka.utils.TestUtils._
-import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import collection.Map
import collection.mutable.ListBuffer
-import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1)
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
- var zkClient: ZkClient = null
var brokers: Seq[Broker] = null
override def setUp() {
super.setUp()
- zkClient = zookeeper.client
- // create brokers in zookeeper
brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Sat Apr 7 02:12:26 2012
@@ -27,10 +27,9 @@ import kafka.message.Message
import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
import kafka.zk.ZooKeeperTestHarness
import org.apache.log4j.{Level, Logger}
-import org.I0Itec.zkclient.ZkClient
import org.junit.Assert._
import org.junit.Test
-import kafka.utils.{SystemTime, TestZKUtils, Utils, TestUtils}
+import kafka.utils._
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
private val brokerId1 = 0
@@ -42,13 +41,10 @@ class ProducerTest extends JUnit3Suite w
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
- private var zkClient: ZkClient = null
override def setUp() {
super.setUp()
// set up 2 brokers with 4 partitions each
- zkClient = zookeeper.client
-
val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
val config1 = new KafkaConfig(props1) {
override val numPartitions = 4
@@ -166,7 +162,7 @@ class ProducerTest extends JUnit3Suite w
// restart server 1
server1.startup()
- Thread.sleep(500)
+ Thread.sleep(100)
try {
// cross check if broker 1 got the messages
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Sat Apr 7 02:12:26 2012
@@ -116,10 +116,10 @@ class SyncProducerTest extends JUnit3Sui
response.offsets.foreach(Assert.assertEquals(-1L, _))
// #2 - test that we get correct offsets when partition is owner by broker
- val zkClient = zookeeper.client
CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
+ Thread.sleep(500)
val response2 = producer.send(request)
Assert.assertEquals(request.correlationId, response2.correlationId)
Assert.assertEquals(response2.errors.length, response2.offsets.length)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Sat Apr 7 02:12:26 2012
@@ -20,10 +20,9 @@ package kafka.server
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.admin.CreateTopicCommand
-import org.I0Itec.zkclient.ZkClient
import kafka.utils.TestUtils._
import junit.framework.Assert._
-import kafka.utils.{ZKStringSerializer, Utils, TestUtils}
+import kafka.utils.{Utils, TestUtils}
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -37,13 +36,10 @@ class LeaderElectionTest extends JUnit3S
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
- var zkClient: ZkClient = null
override def setUp() {
super.setUp()
- zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
-
// start both servers
val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Sat Apr 7 02:12:26 2012
@@ -48,7 +48,7 @@ class ServerShutdownTest extends JUnit3S
server.startup()
// create topic
- CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "0")
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
@@ -60,6 +60,7 @@ class ServerShutdownTest extends JUnit3S
server.shutdown()
val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile)
assertTrue(cleanShutDownFile.exists)
+ producer.close()
}
@@ -73,7 +74,7 @@ class ServerShutdownTest extends JUnit3S
val server = new KafkaServer(config)
server.startup()
- waitUntilLeaderIsElected(zookeeper.client, topic, 0, 1000)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
var fetchedMessage: ByteBufferMessageSet = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
@@ -97,6 +98,7 @@ class ServerShutdownTest extends JUnit3S
server.shutdown()
Utils.rm(server.config.logDir)
+ producer.close()
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Sat Apr 7 02:12:26 2012
@@ -20,9 +20,8 @@ package kafka.zk
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxn
import kafka.utils.TestUtils
-import org.I0Itec.zkclient.ZkClient
import java.net.InetSocketAddress
-import kafka.utils.{Utils, ZKStringSerializer}
+import kafka.utils.Utils
class EmbeddedZookeeper(val connectString: String) {
val snapshotDir = TestUtils.tempDir()
@@ -32,8 +31,6 @@ class EmbeddedZookeeper(val connectStrin
val port = connectString.split(":")(1).toInt
val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
factory.startup(zookeeper)
- val client = new ZkClient(connectString)
- client.setZkSerializer(ZKStringSerializer)
def shutdown() {
factory.shutdown()
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala?rev=1310661&r1=1310660&r2=1310661&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala Sat Apr 7 02:12:26 2012
@@ -18,19 +18,24 @@
package kafka.zk
import org.scalatest.junit.JUnit3Suite
-import kafka.utils.TestZKUtils
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZKStringSerializer, TestZKUtils}
trait ZooKeeperTestHarness extends JUnit3Suite {
val zkConnect: String = TestZKUtils.zookeeperConnect
var zookeeper: EmbeddedZookeeper = null
+ var zkClient: ZkClient = null
override def setUp() {
zookeeper = new EmbeddedZookeeper(zkConnect)
+ zkClient = new ZkClient(zookeeper.connectString)
+ zkClient.setZkSerializer(ZKStringSerializer)
super.setUp
}
override def tearDown() {
super.tearDown
+ zkClient.close()
zookeeper.shutdown()
}