You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/02/09 23:04:33 UTC

svn commit: r1242552 - in /incubator/kafka/trunk/core/src: main/scala/kafka/consumer/ main/scala/kafka/message/ main/scala/kafka/producer/ main/scala/kafka/tools/ test/scala/other/kafka/ test/scala/unit/kafka/integration/ test/scala/unit/kafka/javaapi/...

Author: nehanarkhede
Date: Thu Feb  9 22:04:32 2012
New Revision: 1242552

URL: http://svn.apache.org/viewvc?rev=1242552&view=rev
Log:
KAFKA-262 Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own; patched by Neha Narkhede; reviewed by Jun Rao

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
    incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala
    incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
    incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Feb  9 22:04:32 2012
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic._
 import scala.collection._
 import kafka.cluster._
 import kafka.utils._
+import mutable.ListBuffer
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import java.net.InetAddress
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
@@ -91,7 +92,7 @@ private[kafka] class ZookeeperConsumerCo
   private val rebalanceLock = new Object
   private var fetcher: Option[Fetcher] = None
   private var zkClient: ZkClient = null
-  private val topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
+  private var topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
   // queues : (topic,consumerThreadId) -> queue
   private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
@@ -373,8 +374,6 @@ private[kafka] class ZookeeperConsumerCo
                                 kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
     extends IZkChildListener {
     private val dirs = new ZKGroupDirs(group)
-    private var oldPartitionsPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
-    private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
@@ -390,25 +389,12 @@ private[kafka] class ZookeeperConsumerCo
           deletePath(zkClient, znode)
           debug("Consumer " + consumerIdString + " releasing " + znode)
         }
+        topicRegistry.remove(topic)
       }
     }
 
-    private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]],
-                                    newPartMap: Map[String,List[String]],
-                                    oldPartMap: Map[String,List[String]],
-                                    newConsumerMap: Map[String,List[String]],
-                                    oldConsumerMap: Map[String,List[String]]): Map[String, Set[String]] = {
-      var relevantTopicThreadIdsMap = new mutable.HashMap[String, Set[String]]()
-      for ( (topic, consumerThreadIdSet) <- myTopicThreadIdsMap )
-        if ( oldPartMap.get(topic) != newPartMap.get(topic) || oldConsumerMap.get(topic) != newConsumerMap.get(topic))
-          relevantTopicThreadIdsMap += (topic -> consumerThreadIdSet)
-      relevantTopicThreadIdsMap
-    }
-
     def resetState() {
       topicRegistry.clear
-      oldConsumersPerTopicMap.clear
-      oldPartitionsPerTopicMap.clear
     }
 
     def syncedRebalance() {
@@ -437,11 +423,7 @@ private[kafka] class ZookeeperConsumerCo
               /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
                * clear the cache */
               info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
-              oldConsumersPerTopicMap.clear()
-              oldPartitionsPerTopicMap.clear()
           }
-          // commit offsets
-          commitOffsets()
           // stop all fetchers and clear all the queues to avoid data duplication
           closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2))
           // release all partitions, reset state and retry
@@ -457,14 +439,6 @@ private[kafka] class ZookeeperConsumerCo
       val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
-      val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap)
-      if (relevantTopicThreadIdsMap.size <= 0) {
-        info("Consumer %s with %s and topic partitions %s doesn't need to rebalance.".
-          format(consumerIdString, consumersPerTopicMap, partitionsPerTopicMap))
-        debug("Partitions per topic cache " + oldPartitionsPerTopicMap)
-        debug("Consumers per topic cache " + oldConsumersPerTopicMap)
-        return true
-      }
 
       /**
        * fetchers must be stopped to avoid data duplication, since if the current
@@ -472,14 +446,15 @@ private[kafka] class ZookeeperConsumerCo
        * But if we don't stop the fetchers first, this consumer would continue returning data for released
        * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
        */
-      closeFetchers(cluster, kafkaMessageStreams, relevantTopicThreadIdsMap)
+      closeFetchers(cluster, kafkaMessageStreams, myTopicThreadIdsMap)
 
       releasePartitionOwnership()
 
       var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
-      for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
-        topicRegistry.remove(topic)
-        topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
+      var currentTopicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
+
+      for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
+        currentTopicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
 
         val topicDirs = new ZKGroupTopicDirs(group, topic)
         val curConsumers = consumersPerTopicMap.get(topic).get
@@ -507,11 +482,9 @@ private[kafka] class ZookeeperConsumerCo
             for (i <- startPart until startPart + nParts) {
               val partition = curPartitions(i)
               info(consumerThreadId + " attempting to claim partition " + partition)
-              val ownPartition = processPartition(topicDirs, partition, topic, consumerThreadId)
-              if (!ownPartition)
-                return false
-              else // record the partition ownership decision
-                partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
+              addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
+              // record the partition ownership decision
+              partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
             }
           }
         }
@@ -525,8 +498,7 @@ private[kafka] class ZookeeperConsumerCo
         info("Updating the cache")
         debug("Partitions per topic cache " + partitionsPerTopicMap)
         debug("Consumers per topic cache " + consumersPerTopicMap)
-        oldPartitionsPerTopicMap = partitionsPerTopicMap
-        oldConsumersPerTopicMap = consumersPerTopicMap
+        topicRegistry = currentTopicRegistry
         updateFetcher(cluster, kafkaMessageStreams)
         true
       }else
@@ -579,27 +551,6 @@ private[kafka] class ZookeeperConsumerCo
       }
     }
 
-    private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String,
-                                 topic: String, consumerThreadId: String) : Boolean = {
-      val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
-      // check if some other consumer owns this partition at this time
-      val currentPartitionOwner = readDataMaybeNull(zkClient, partitionOwnerPath)
-      if(currentPartitionOwner != null) {
-        if(currentPartitionOwner.equals(consumerThreadId)) {
-          info(partitionOwnerPath + " exists with value " + currentPartitionOwner + " during connection loss; this is ok")
-          addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
-          true
-        }
-        else {
-          info(partitionOwnerPath + " exists with value " + currentPartitionOwner)
-          false
-        }
-      } else {
-        addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
-        true
-      }
-    }
-
     private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
       val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
         val topic = partitionOwner._1._1
@@ -620,15 +571,17 @@ private[kafka] class ZookeeperConsumerCo
           case e2 => throw e2
         }
       }
-      val success = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => if(decision) 0 else 1)
-      if(success > 0) false       /* even if one of the partition ownership attempt has failed, return false */
+      val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
+      /* even if one of the partition ownership attempt has failed, return false */
+      if(hasPartitionOwnershipFailed > 0) false
       else true
     }
 
-    private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partitionString: String,
+    private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]],
+                                      topicDirs: ZKGroupTopicDirs, partitionString: String,
                                       topic: String, consumerThreadId: String) {
       val partition = Partition.parse(partitionString)
-      val partTopicInfoMap = topicRegistry.get(topic)
+      val partTopicInfoMap = currentTopicRegistry.get(topic)
 
       val znode = topicDirs.consumerOffsetDir + "/" + partition.name
       val offsetString = readDataMaybeNull(zkClient, znode)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala Thu Feb  9 22:04:32 2012
@@ -49,7 +49,7 @@ class GZIPCompression(inputStream: Input
   }
 
   override def read(a: Array[Byte]): Int = {
-    gzipIn.read(a)	
+    gzipIn.read(a)
   }
 }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Thu Feb  9 22:04:32 2012
@@ -18,13 +18,13 @@
 package kafka.producer
 
 import async.MissingConfigException
-import org.apache.log4j.spi.{LoggingEvent, ErrorCode}
+import org.apache.log4j.spi.LoggingEvent
 import org.apache.log4j.AppenderSkeleton
 import org.apache.log4j.helpers.LogLog
-import kafka.utils.{Utils, Logging}
+import kafka.utils.Logging
 import kafka.serializer.Encoder
 import java.util.{Properties, Date}
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message.Message
 import scala.collection._
 
 class KafkaLog4jAppender extends AppenderSkeleton with Logging {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala Thu Feb  9 22:04:32 2012
@@ -57,9 +57,9 @@ object VerifyConsumerRebalance extends L
       // check if the rebalancing operation succeeded.
       try {
         if(validateRebalancingOperation(zkClient, group))
-          info("Rebalance operation successful !")
+          println("Rebalance operation successful !")
         else
-          error("Rebalance operation failed !")
+          println("Rebalance operation failed !")
       } catch {
         case e2: Throwable => error("Error while verifying current rebalancing operation", e2)
       }
@@ -132,6 +132,4 @@ object VerifyConsumerRebalance extends L
 
     rebalanceSucceeded
   }
-
-
-}
\ No newline at end of file
+}

Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala Thu Feb  9 22:04:32 2012
@@ -18,7 +18,7 @@
 package kafka
 
 import message.Message
-import org.apache.log4j.{Logger, PropertyConfigurator}
+import org.apache.log4j.PropertyConfigurator
 import kafka.utils.Logging
 import serializer.Encoder
 

Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala Thu Feb  9 22:04:32 2012
@@ -17,8 +17,6 @@
 
 package kafka
 
-import java.net.URI
-import java.util.Arrays.asList
 import java.io._
 import java.nio._
 import java.nio.channels._

Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala Thu Feb  9 22:04:32 2012
@@ -17,7 +17,6 @@
 
 package kafka.log
 
-import kafka.log._
 import kafka.message._
 import kafka.utils.{TestUtils, Utils}
 

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Thu Feb  9 22:04:32 2012
@@ -22,10 +22,9 @@ import org.scalatest.junit.JUnit3Suite
 import org.apache.log4j.Logger
 import java.util.Properties
 import kafka.consumer.SimpleConsumer
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.TestUtils
 import kafka.api.{OffsetRequest, FetchRequest}
 import junit.framework.Assert._
-import java.io.File
 
 class BackwardsCompatibilityTest extends JUnit3Suite {
 

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala Thu Feb  9 22:04:32 2012
@@ -17,11 +17,6 @@
 
 package kafka.integration
 
-import java.util.Properties
-import junit.framework.Assert._
-import kafka.producer._
-import kafka.consumer._
-import kafka.message._
 import kafka.server._
 import kafka.utils.{Utils, TestUtils}
 import org.scalatest.junit.JUnit3Suite

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala Thu Feb  9 22:04:32 2012
@@ -35,7 +35,6 @@ trait BaseMessageSetTestCases extends JU
 
   @Test
   def testWrittenEqualsRead {
-    import scala.collection.JavaConversions._
     val messageSet = createMessageSet(messages)
     TestUtils.checkEquals(messages.iterator, toMessageIterator(messageSet))
   }

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala Thu Feb  9 22:04:32 2012
@@ -18,7 +18,6 @@
 package kafka.javaapi.message
 
 import java.nio._
-import junit.framework.TestCase
 import junit.framework.Assert._
 import org.junit.Test
 import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala Thu Feb  9 22:04:32 2012
@@ -24,7 +24,6 @@ import kafka.zk.EmbeddedZookeeper
 import kafka.utils.{TestZKUtils, TestUtils}
 import org.junit.{After, Before, Test}
 import junit.framework.Assert
-import collection.mutable.HashMap
 import org.easymock.EasyMock
 import kafka.utils.Utils
 import java.util.concurrent.ConcurrentHashMap
@@ -34,7 +33,7 @@ import org.scalatest.junit.JUnitSuite
 import kafka.producer.{SyncProducerConfig, Partitioner, ProducerConfig, DefaultPartitioner}
 import kafka.producer.ProducerPool
 import kafka.javaapi.message.ByteBufferMessageSet
-import kafka.producer.async.{AsyncProducer, AsyncProducerConfig}
+import kafka.producer.async.AsyncProducer
 import kafka.javaapi.Implicits._
 import kafka.serializer.{StringEncoder, Encoder}
 import kafka.javaapi.consumer.SimpleConsumer

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala Thu Feb  9 22:04:32 2012
@@ -17,11 +17,11 @@
 
 package kafka.javaapi.producer
 
-import junit.framework.{Assert, TestCase}
+import junit.framework.Assert
 import kafka.utils.SystemTime
 import kafka.utils.TestUtils
 import kafka.server.{KafkaServer, KafkaConfig}
-import org.apache.log4j.{Logger, Level}
+import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import java.util.Properties

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Thu Feb  9 22:04:32 2012
@@ -18,14 +18,13 @@
 package kafka.log
 
 import java.io._
-import java.nio._
 import java.util.ArrayList
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils.{Utils, TestUtils, Range}
 import kafka.common.OffsetOutOfRangeException
-import kafka.message.{NoCompressionCodec, MessageSet, ByteBufferMessageSet, Message}
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 
 class LogTest extends JUnitSuite {
   

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Thu Feb  9 22:04:32 2012
@@ -23,13 +23,12 @@ import java.util.Properties
 import java.io.File
 import kafka.consumer.SimpleConsumer
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.TestUtils
 import kafka.utils.{TestUtils, TestZKUtils,Utils, Logging}
 import kafka.zk.EmbeddedZookeeper
 import junit.framework.Assert._
 import kafka.api.FetchRequest
 import kafka.serializer.Encoder
-import kafka.message.{MessageSet, Message}
+import kafka.message.Message
 import kafka.producer.async.MissingConfigException
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala Thu Feb  9 22:04:32 2012
@@ -17,7 +17,6 @@
 
 package kafka.message
 
-import java.util.Arrays
 import junit.framework.Assert._
 import kafka.utils.TestUtils._
 import org.scalatest.junit.JUnitSuite

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala Thu Feb  9 22:04:32 2012
@@ -18,12 +18,9 @@
 package kafka.message
 
 import java.nio._
-import java.util.Arrays
-import junit.framework.TestCase
 import junit.framework.Assert._
 import kafka.utils.TestUtils._
 import org.junit.Test
-import kafka.message._
 
 class FileMessageSetTest extends BaseMessageSetTestCases {
   

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala Thu Feb  9 22:04:32 2012
@@ -19,9 +19,6 @@ package kafka.message
 
 import java.util._
 import java.nio._
-import java.nio.channels._
-import java.io._
-import junit.framework.TestCase
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{Before, Test}

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Thu Feb  9 22:04:32 2012
@@ -19,13 +19,9 @@ package kafka.network;
 
 import java.net._
 import java.io._
-import java.nio._
-import java.nio.channels._
 import org.junit._
-import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import kafka.utils.TestUtils
-import kafka.network._
 import java.util.Random
 import org.apache.log4j._
 

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Thu Feb  9 22:04:32 2012
@@ -17,7 +17,7 @@
 
 package kafka.producer
 
-import junit.framework.{Assert, TestCase}
+import junit.framework.Assert
 import java.util.Properties
 import org.easymock.EasyMock
 import kafka.api.ProducerRequest

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Thu Feb  9 22:04:32 2012
@@ -17,14 +17,13 @@
 
 package kafka.producer
 
-import async.{AsyncProducerConfig, AsyncProducer}
+import async.AsyncProducer
 import java.util.Properties
 import org.apache.log4j.{Logger, Level}
 import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
 import kafka.zk.EmbeddedZookeeper
 import org.junit.{After, Before, Test}
 import junit.framework.Assert
-import collection.mutable.HashMap
 import org.easymock.EasyMock
 import java.util.concurrent.ConcurrentHashMap
 import kafka.cluster.Partition

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Thu Feb  9 22:04:32 2012
@@ -17,11 +17,11 @@
 
 package kafka.producer
 
-import junit.framework.{Assert, TestCase}
+import junit.framework.Assert
 import kafka.utils.SystemTime
 import kafka.utils.TestUtils
 import kafka.server.{KafkaServer, KafkaConfig}
-import org.apache.log4j.{Logger, Level}
+import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.common.MessageSizeTooLargeException

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1242552&r1=1242551&r2=1242552&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Thu Feb  9 22:04:32 2012
@@ -20,13 +20,11 @@ import kafka.utils.TestUtils
 import java.io.File
 import kafka.utils.Utils
 import kafka.api.FetchRequest
-import kafka.integration.ProducerConsumerTestHarness
 import kafka.producer.{SyncProducer, SyncProducerConfig}
 import kafka.consumer.SimpleConsumer
 import java.util.Properties
 import org.scalatest.junit.JUnitSuite
-import junit.framework.{Assert, TestCase}
-import org.junit.{After, Before, Test}
+import org.junit.Test
 import junit.framework.Assert._
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}