You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/02/09 23:04:33 UTC
svn commit: r1242552 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/consumer/ main/scala/kafka/message/
main/scala/kafka/producer/ main/scala/kafka/tools/ test/scala/other/kafka/
test/scala/unit/kafka/integration/ test/scala/unit/kafka/javaapi/...
Author: nehanarkhede
Date: Thu Feb 9 22:04:32 2012
New Revision: 1242552
URL: http://svn.apache.org/viewvc?rev=1242552&view=rev
Log:
KAFKA-262 Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own; patched by Neha Narkhede; reviewed by Jun Rao
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala
incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Feb 9 22:04:32 2012
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic._
import scala.collection._
import kafka.cluster._
import kafka.utils._
+import mutable.ListBuffer
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import java.net.InetAddress
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
@@ -91,7 +92,7 @@ private[kafka] class ZookeeperConsumerCo
private val rebalanceLock = new Object
private var fetcher: Option[Fetcher] = None
private var zkClient: ZkClient = null
- private val topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
+ private var topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
// queues : (topic,consumerThreadId) -> queue
private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
@@ -373,8 +374,6 @@ private[kafka] class ZookeeperConsumerCo
kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
extends IZkChildListener {
private val dirs = new ZKGroupDirs(group)
- private var oldPartitionsPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
- private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
@@ -390,25 +389,12 @@ private[kafka] class ZookeeperConsumerCo
deletePath(zkClient, znode)
debug("Consumer " + consumerIdString + " releasing " + znode)
}
+ topicRegistry.remove(topic)
}
}
- private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]],
- newPartMap: Map[String,List[String]],
- oldPartMap: Map[String,List[String]],
- newConsumerMap: Map[String,List[String]],
- oldConsumerMap: Map[String,List[String]]): Map[String, Set[String]] = {
- var relevantTopicThreadIdsMap = new mutable.HashMap[String, Set[String]]()
- for ( (topic, consumerThreadIdSet) <- myTopicThreadIdsMap )
- if ( oldPartMap.get(topic) != newPartMap.get(topic) || oldConsumerMap.get(topic) != newConsumerMap.get(topic))
- relevantTopicThreadIdsMap += (topic -> consumerThreadIdSet)
- relevantTopicThreadIdsMap
- }
-
def resetState() {
topicRegistry.clear
- oldConsumersPerTopicMap.clear
- oldPartitionsPerTopicMap.clear
}
def syncedRebalance() {
@@ -437,11 +423,7 @@ private[kafka] class ZookeeperConsumerCo
/* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
* clear the cache */
info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
- oldConsumersPerTopicMap.clear()
- oldPartitionsPerTopicMap.clear()
}
- // commit offsets
- commitOffsets()
// stop all fetchers and clear all the queues to avoid data duplication
closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2))
// release all partitions, reset state and retry
@@ -457,14 +439,6 @@ private[kafka] class ZookeeperConsumerCo
val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
- val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap)
- if (relevantTopicThreadIdsMap.size <= 0) {
- info("Consumer %s with %s and topic partitions %s doesn't need to rebalance.".
- format(consumerIdString, consumersPerTopicMap, partitionsPerTopicMap))
- debug("Partitions per topic cache " + oldPartitionsPerTopicMap)
- debug("Consumers per topic cache " + oldConsumersPerTopicMap)
- return true
- }
/**
* fetchers must be stopped to avoid data duplication, since if the current
@@ -472,14 +446,15 @@ private[kafka] class ZookeeperConsumerCo
* But if we don't stop the fetchers first, this consumer would continue returning data for released
* partitions in parallel. So, not stopping the fetchers leads to duplicate data.
*/
- closeFetchers(cluster, kafkaMessageStreams, relevantTopicThreadIdsMap)
+ closeFetchers(cluster, kafkaMessageStreams, myTopicThreadIdsMap)
releasePartitionOwnership()
var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
- for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
- topicRegistry.remove(topic)
- topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
+ var currentTopicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
+
+ for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
+ currentTopicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
val topicDirs = new ZKGroupTopicDirs(group, topic)
val curConsumers = consumersPerTopicMap.get(topic).get
@@ -507,11 +482,9 @@ private[kafka] class ZookeeperConsumerCo
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
- val ownPartition = processPartition(topicDirs, partition, topic, consumerThreadId)
- if (!ownPartition)
- return false
- else // record the partition ownership decision
- partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
+ addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
+ // record the partition ownership decision
+ partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
}
}
}
@@ -525,8 +498,7 @@ private[kafka] class ZookeeperConsumerCo
info("Updating the cache")
debug("Partitions per topic cache " + partitionsPerTopicMap)
debug("Consumers per topic cache " + consumersPerTopicMap)
- oldPartitionsPerTopicMap = partitionsPerTopicMap
- oldConsumersPerTopicMap = consumersPerTopicMap
+ topicRegistry = currentTopicRegistry
updateFetcher(cluster, kafkaMessageStreams)
true
}else
@@ -579,27 +551,6 @@ private[kafka] class ZookeeperConsumerCo
}
}
- private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String,
- topic: String, consumerThreadId: String) : Boolean = {
- val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
- // check if some other consumer owns this partition at this time
- val currentPartitionOwner = readDataMaybeNull(zkClient, partitionOwnerPath)
- if(currentPartitionOwner != null) {
- if(currentPartitionOwner.equals(consumerThreadId)) {
- info(partitionOwnerPath + " exists with value " + currentPartitionOwner + " during connection loss; this is ok")
- addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
- true
- }
- else {
- info(partitionOwnerPath + " exists with value " + currentPartitionOwner)
- false
- }
- } else {
- addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
- true
- }
- }
-
private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
val topic = partitionOwner._1._1
@@ -620,15 +571,17 @@ private[kafka] class ZookeeperConsumerCo
case e2 => throw e2
}
}
- val success = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => if(decision) 0 else 1)
- if(success > 0) false /* even if one of the partition ownership attempt has failed, return false */
+ val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
+ /* even if one of the partition ownership attempt has failed, return false */
+ if(hasPartitionOwnershipFailed > 0) false
else true
}
- private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partitionString: String,
+ private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]],
+ topicDirs: ZKGroupTopicDirs, partitionString: String,
topic: String, consumerThreadId: String) {
val partition = Partition.parse(partitionString)
- val partTopicInfoMap = topicRegistry.get(topic)
+ val partTopicInfoMap = currentTopicRegistry.get(topic)
val znode = topicDirs.consumerOffsetDir + "/" + partition.name
val offsetString = readDataMaybeNull(zkClient, znode)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala Thu Feb 9 22:04:32 2012
@@ -49,7 +49,7 @@ class GZIPCompression(inputStream: Input
}
override def read(a: Array[Byte]): Int = {
- gzipIn.read(a)
+ gzipIn.read(a)
}
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Thu Feb 9 22:04:32 2012
@@ -18,13 +18,13 @@
package kafka.producer
import async.MissingConfigException
-import org.apache.log4j.spi.{LoggingEvent, ErrorCode}
+import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.helpers.LogLog
-import kafka.utils.{Utils, Logging}
+import kafka.utils.Logging
import kafka.serializer.Encoder
import java.util.{Properties, Date}
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message.Message
import scala.collection._
class KafkaLog4jAppender extends AppenderSkeleton with Logging {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala Thu Feb 9 22:04:32 2012
@@ -57,9 +57,9 @@ object VerifyConsumerRebalance extends L
// check if the rebalancing operation succeeded.
try {
if(validateRebalancingOperation(zkClient, group))
- info("Rebalance operation successful !")
+ println("Rebalance operation successful !")
else
- error("Rebalance operation failed !")
+ println("Rebalance operation failed !")
} catch {
case e2: Throwable => error("Error while verifying current rebalancing operation", e2)
}
@@ -132,6 +132,4 @@ object VerifyConsumerRebalance extends L
rebalanceSucceeded
}
-
-
-}
\ No newline at end of file
+}
Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala Thu Feb 9 22:04:32 2012
@@ -18,7 +18,7 @@
package kafka
import message.Message
-import org.apache.log4j.{Logger, PropertyConfigurator}
+import org.apache.log4j.PropertyConfigurator
import kafka.utils.Logging
import serializer.Encoder
Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala Thu Feb 9 22:04:32 2012
@@ -17,8 +17,6 @@
package kafka
-import java.net.URI
-import java.util.Arrays.asList
import java.io._
import java.nio._
import java.nio.channels._
Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala Thu Feb 9 22:04:32 2012
@@ -17,7 +17,6 @@
package kafka.log
-import kafka.log._
import kafka.message._
import kafka.utils.{TestUtils, Utils}
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Thu Feb 9 22:04:32 2012
@@ -22,10 +22,9 @@ import org.scalatest.junit.JUnit3Suite
import org.apache.log4j.Logger
import java.util.Properties
import kafka.consumer.SimpleConsumer
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.TestUtils
import kafka.api.{OffsetRequest, FetchRequest}
import junit.framework.Assert._
-import java.io.File
class BackwardsCompatibilityTest extends JUnit3Suite {
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala Thu Feb 9 22:04:32 2012
@@ -17,11 +17,6 @@
package kafka.integration
-import java.util.Properties
-import junit.framework.Assert._
-import kafka.producer._
-import kafka.consumer._
-import kafka.message._
import kafka.server._
import kafka.utils.{Utils, TestUtils}
import org.scalatest.junit.JUnit3Suite
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala Thu Feb 9 22:04:32 2012
@@ -35,7 +35,6 @@ trait BaseMessageSetTestCases extends JU
@Test
def testWrittenEqualsRead {
- import scala.collection.JavaConversions._
val messageSet = createMessageSet(messages)
TestUtils.checkEquals(messages.iterator, toMessageIterator(messageSet))
}
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala Thu Feb 9 22:04:32 2012
@@ -18,7 +18,6 @@
package kafka.javaapi.message
import java.nio._
-import junit.framework.TestCase
import junit.framework.Assert._
import org.junit.Test
import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala Thu Feb 9 22:04:32 2012
@@ -24,7 +24,6 @@ import kafka.zk.EmbeddedZookeeper
import kafka.utils.{TestZKUtils, TestUtils}
import org.junit.{After, Before, Test}
import junit.framework.Assert
-import collection.mutable.HashMap
import org.easymock.EasyMock
import kafka.utils.Utils
import java.util.concurrent.ConcurrentHashMap
@@ -34,7 +33,7 @@ import org.scalatest.junit.JUnitSuite
import kafka.producer.{SyncProducerConfig, Partitioner, ProducerConfig, DefaultPartitioner}
import kafka.producer.ProducerPool
import kafka.javaapi.message.ByteBufferMessageSet
-import kafka.producer.async.{AsyncProducer, AsyncProducerConfig}
+import kafka.producer.async.AsyncProducer
import kafka.javaapi.Implicits._
import kafka.serializer.{StringEncoder, Encoder}
import kafka.javaapi.consumer.SimpleConsumer
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala Thu Feb 9 22:04:32 2012
@@ -17,11 +17,11 @@
package kafka.javaapi.producer
-import junit.framework.{Assert, TestCase}
+import junit.framework.Assert
import kafka.utils.SystemTime
import kafka.utils.TestUtils
import kafka.server.{KafkaServer, KafkaConfig}
-import org.apache.log4j.{Logger, Level}
+import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import java.util.Properties
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Thu Feb 9 22:04:32 2012
@@ -18,14 +18,13 @@
package kafka.log
import java.io._
-import java.nio._
import java.util.ArrayList
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.utils.{Utils, TestUtils, Range}
import kafka.common.OffsetOutOfRangeException
-import kafka.message.{NoCompressionCodec, MessageSet, ByteBufferMessageSet, Message}
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
class LogTest extends JUnitSuite {
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Thu Feb 9 22:04:32 2012
@@ -23,13 +23,12 @@ import java.util.Properties
import java.io.File
import kafka.consumer.SimpleConsumer
import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.TestUtils
import kafka.utils.{TestUtils, TestZKUtils,Utils, Logging}
import kafka.zk.EmbeddedZookeeper
import junit.framework.Assert._
import kafka.api.FetchRequest
import kafka.serializer.Encoder
-import kafka.message.{MessageSet, Message}
+import kafka.message.Message
import kafka.producer.async.MissingConfigException
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala Thu Feb 9 22:04:32 2012
@@ -17,7 +17,6 @@
package kafka.message
-import java.util.Arrays
import junit.framework.Assert._
import kafka.utils.TestUtils._
import org.scalatest.junit.JUnitSuite
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala Thu Feb 9 22:04:32 2012
@@ -18,12 +18,9 @@
package kafka.message
import java.nio._
-import java.util.Arrays
-import junit.framework.TestCase
import junit.framework.Assert._
import kafka.utils.TestUtils._
import org.junit.Test
-import kafka.message._
class FileMessageSetTest extends BaseMessageSetTestCases {
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala Thu Feb 9 22:04:32 2012
@@ -19,9 +19,6 @@ package kafka.message
import java.util._
import java.nio._
-import java.nio.channels._
-import java.io._
-import junit.framework.TestCase
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{Before, Test}
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Thu Feb 9 22:04:32 2012
@@ -19,13 +19,9 @@ package kafka.network;
import java.net._
import java.io._
-import java.nio._
-import java.nio.channels._
import org.junit._
-import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import kafka.utils.TestUtils
-import kafka.network._
import java.util.Random
import org.apache.log4j._
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Thu Feb 9 22:04:32 2012
@@ -17,7 +17,7 @@
package kafka.producer
-import junit.framework.{Assert, TestCase}
+import junit.framework.Assert
import java.util.Properties
import org.easymock.EasyMock
import kafka.api.ProducerRequest
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Thu Feb 9 22:04:32 2012
@@ -17,14 +17,13 @@
package kafka.producer
-import async.{AsyncProducerConfig, AsyncProducer}
+import async.AsyncProducer
import java.util.Properties
import org.apache.log4j.{Logger, Level}
import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
import kafka.zk.EmbeddedZookeeper
import org.junit.{After, Before, Test}
import junit.framework.Assert
-import collection.mutable.HashMap
import org.easymock.EasyMock
import java.util.concurrent.ConcurrentHashMap
import kafka.cluster.Partition
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Thu Feb 9 22:04:32 2012
@@ -17,11 +17,11 @@
package kafka.producer
-import junit.framework.{Assert, TestCase}
+import junit.framework.Assert
import kafka.utils.SystemTime
import kafka.utils.TestUtils
import kafka.server.{KafkaServer, KafkaConfig}
-import org.apache.log4j.{Logger, Level}
+import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.common.MessageSizeTooLargeException
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Thu Feb 9 22:04:32 2012
@@ -20,13 +20,11 @@ import kafka.utils.TestUtils
import java.io.File
import kafka.utils.Utils
import kafka.api.FetchRequest
-import kafka.integration.ProducerConsumerTestHarness
import kafka.producer.{SyncProducer, SyncProducerConfig}
import kafka.consumer.SimpleConsumer
import java.util.Properties
import org.scalatest.junit.JUnitSuite
-import junit.framework.{Assert, TestCase}
-import org.junit.{After, Before, Test}
+import org.junit.Test
import junit.framework.Assert._
import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}