You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/08/07 18:29:56 UTC

svn commit: r1154719 - in /incubator/kafka/trunk/core/src: main/scala/kafka/consumer/ main/scala/kafka/producer/ main/scala/kafka/server/ main/scala/kafka/tools/ main/scala/kafka/utils/ test/scala/other/kafka/ test/scala/unit/kafka/utils/ test/scala/un...

Author: junrao
Date: Sun Aug  7 16:29:55 2011
New Revision: 1154719

URL: http://svn.apache.org/viewvc?rev=1154719&view=rev
Log:
commit offset before consumer shutdown KAFKA-84; rename util.StringSerializer to ZKStringSerializer to avoid confusion with producer.StringSerializer

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Sun Aug  7 16:29:55 2011
@@ -28,7 +28,7 @@ import java.io.PrintStream
 import kafka.message._
 import kafka.utils.Utils
 import kafka.utils.ZkUtils
-import kafka.utils.StringSerializer
+import kafka.utils.ZKStringSerializer
 
 /**
  * Consumer that dumps messages out to standard out.
@@ -200,7 +200,7 @@ object ConsoleConsumer {
     try {
       val dir = "/consumers/" + groupId
       logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
-      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer)
+      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
       zk.deleteRecursive(dir)
       zk.close()
     } catch {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Sun Aug  7 16:29:55 2011
@@ -92,8 +92,8 @@ private[kafka] class ZookeeperConsumerCo
   // queues : (topic,consumerThreadId) -> queue
   private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
-  connectZk
-  createFetcher
+  connectZk()
+  createFetcher()
   if (config.autoCommit) {
     logger.info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
     scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs)
@@ -112,7 +112,7 @@ private[kafka] class ZookeeperConsumerCo
 
   private def connectZk() {
     logger.info("Connecting to zookeeper instance at " + config.zkConnect)
-    zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
+    zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
   }
 
   def shutdown() {
@@ -120,12 +120,14 @@ private[kafka] class ZookeeperConsumerCo
     if (canShutdown) {
       logger.info("ZKConsumerConnector shutting down")
       try {
-        scheduler.shutdown
+        scheduler.shutdown()
         fetcher match {
-          case Some(f) => f.shutdown
+          case Some(f) => f.shutdown()
           case None =>
         }
-        sendShudownToAllQueues
+        sendShudownToAllQueues()
+        if (config.autoCommit)
+          commitOffsets()
         if (zkClient != null) {
           zkClient.close()
           zkClient = null
@@ -186,7 +188,7 @@ private[kafka] class ZookeeperConsumerCo
       new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener))
 
     // explicitly trigger load balancing for this consumer
-    loadBalancerListener.syncedRebalance
+    loadBalancerListener.syncedRebalance()
     ret
   }
 
@@ -199,7 +201,7 @@ private[kafka] class ZookeeperConsumerCo
   private def sendShudownToAllQueues() = {
     for (queue <- queues.values) {
       logger.debug("Clearing up queue")
-      queue.clear
+      queue.clear()
       queue.put(ZookeeperConsumerConnector.shutdownCommand)
       logger.debug("Cleared queue and sent shutdown command")
     }
@@ -209,7 +211,7 @@ private[kafka] class ZookeeperConsumerCo
     if(logger.isTraceEnabled)
       logger.trace("auto committing")
     try {
-      commitOffsets
+      commitOffsets()
     }
     catch {
       case t: Throwable =>
@@ -419,7 +421,7 @@ private[kafka] class ZookeeperConsumerCo
           logger.info("begin rebalancing consumer " + consumerIdString + " try #" + i)
           var done = false
           try {
-            done = rebalance
+            done = rebalance()
           }
           catch {
             case e =>
@@ -432,8 +434,8 @@ private[kafka] class ZookeeperConsumerCo
           if (done)
             return
           // release all partitions, reset state and retry
-          releasePartitionOwnership
-          resetState
+          releasePartitionOwnership()
+          resetState()
           Thread.sleep(config.zkSyncTimeMs)
         }
       }
@@ -462,7 +464,7 @@ private[kafka] class ZookeeperConsumerCo
       commitOffsets
 
       logger.info("Releasing partition ownership")
-      releasePartitionOwnership
+      releasePartitionOwnership()
 
       val queuesToBeCleared = new mutable.HashSet[BlockingQueue[FetchedDataChunk]]
       for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala Sun Aug  7 16:29:55 2011
@@ -15,7 +15,7 @@
 */
 package kafka.producer
 
-import kafka.utils.{StringSerializer, ZkUtils, ZKConfig}
+import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig}
 import collection.mutable.HashMap
 import collection.mutable.Map
 import org.apache.log4j.Logger
@@ -59,7 +59,7 @@ private[producer] class ZKBrokerPartitio
   private val logger = Logger.getLogger(classOf[ZKBrokerPartitionInfo])
   private val zkWatcherLock = new Object
   private val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-    StringSerializer)
+    ZKStringSerializer)
   // maintain a map from topic -> list of (broker, num_partitions) from zookeeper
   private var topicBrokerPartitions = getZKTopicPartitionInfo
   // maintain a map from broker id to the corresponding Broker object

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Sun Aug  7 16:29:55 2011
@@ -42,7 +42,7 @@ class KafkaZooKeeper(config: KafkaConfig
   def startup() {
     /* start client */
     logger.info("connecting to ZK: " + config.zkConnect)
-    zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
+    zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
     zkClient.subscribeStateChanges(new SessionExpireListener)
   }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Sun Aug  7 16:29:55 2011
@@ -9,7 +9,7 @@ import kafka.producer.async.DefaultEvent
 import kafka.serializer.{DefaultEncoder, StringEncoder}
 import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
 import kafka.consumer._
-import kafka.utils.{StringSerializer, Utils}
+import kafka.utils.{ZKStringSerializer, Utils}
 import kafka.api.OffsetRequest
 import org.I0Itec.zkclient._
 import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet}
@@ -131,7 +131,7 @@ object ReplayLogProducer {
     try {
       val dir = "/consumers/" + groupId
       logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
-      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer)
+      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
       zk.deleteRecursive(dir)
       zk.close()
     } catch {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Sun Aug  7 16:29:55 2011
@@ -33,7 +33,7 @@ object UpdateOffsetsInZK {
       usage
     val config = new ConsumerConfig(Utils.loadProps(args(1)))
     val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
-        config.zkConnectionTimeoutMs, StringSerializer)
+        config.zkConnectionTimeoutMs, ZKStringSerializer)
     args(0) match {
       case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2))
       case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2))

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala Sun Aug  7 16:29:55 2011
@@ -239,7 +239,7 @@ object ZkUtils {
   }
 }
 
-object StringSerializer extends ZkSerializer {
+object ZKStringSerializer extends ZkSerializer {
 
   @throws(classOf[ZkMarshallingError])
   def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")

Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala Sun Aug  7 16:29:55 2011
@@ -17,7 +17,7 @@
 package kafka
 
 import consumer.ConsumerConfig
-import utils.{StringSerializer, ZkUtils, Utils}
+import utils.{ZKStringSerializer, ZkUtils, Utils}
 import org.I0Itec.zkclient.ZkClient
 
 object DeleteZKPath {
@@ -31,7 +31,7 @@ object DeleteZKPath {
     val zkPath = args(1)
 
     val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-      StringSerializer)
+      ZKStringSerializer)
 
     try {
       ZkUtils.deletePathRecursive(zkClient, zkPath);

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Sun Aug  7 16:29:55 2011
@@ -209,7 +209,7 @@ object TestUtils {
    *  Throw an exception if an iterable has different length than expected
    *  
    */
-  def checkLength[T](s1: Iterator[T], expectedLength:Integer) {
+  def checkLength[T](s1: Iterator[T], expectedLength:Int) {
     var n = 0
     while (s1.hasNext) {
       n+=1
@@ -283,7 +283,7 @@ object TestUtils {
   }
 
   def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
-    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
+    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
     ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
 
   }

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Sun Aug  7 16:29:55 2011
@@ -21,7 +21,7 @@ import org.apache.zookeeper.server.NIOSe
 import kafka.utils.TestUtils
 import org.I0Itec.zkclient.ZkClient
 import java.net.InetSocketAddress
-import kafka.utils.{Utils, StringSerializer}
+import kafka.utils.{Utils, ZKStringSerializer}
 
 class EmbeddedZookeeper(val connectString: String) {
   val snapshotDir = TestUtils.tempDir()
@@ -31,7 +31,7 @@ class EmbeddedZookeeper(val connectStrin
   val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
   factory.startup(zookeeper)
   val client = new ZkClient(connectString)
-  client.setZkSerializer(StringSerializer)
+  client.setZkSerializer(ZKStringSerializer)
 
   def shutdown() {
     factory.shutdown()

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala Sun Aug  7 16:29:55 2011
@@ -18,7 +18,7 @@ package kafka.zk
 
 import kafka.consumer.ConsumerConfig
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZkUtils, StringSerializer}
+import kafka.utils.{ZkUtils, ZKStringSerializer}
 import kafka.utils.{TestZKUtils, TestUtils}
 import org.junit.Assert
 import org.scalatest.junit.JUnit3Suite
@@ -30,7 +30,7 @@ class ZKEphemeralTest extends JUnit3Suit
   def testEphemeralNodeCleanup = {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
     var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-                                StringSerializer)
+                                ZKStringSerializer)
 
     try {
       ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
@@ -48,7 +48,7 @@ class ZKEphemeralTest extends JUnit3Suit
     Thread.sleep(zkSessionTimeoutMs)
 
     zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-                                StringSerializer)
+                                ZKStringSerializer)
 
     val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest")
     Assert.assertFalse(nodeExists)