You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/08/07 18:29:56 UTC
svn commit: r1154719 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/consumer/ main/scala/kafka/producer/
main/scala/kafka/server/ main/scala/kafka/tools/ main/scala/kafka/utils/
test/scala/other/kafka/ test/scala/unit/kafka/utils/ test/scala/un...
Author: junrao
Date: Sun Aug 7 16:29:55 2011
New Revision: 1154719
URL: http://svn.apache.org/viewvc?rev=1154719&view=rev
Log:
commit offset before consumer shutdown KAFKA-84; rename util.StringSerializer to ZKStringSerializer to avoid confusion with producer.StringSerializer
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Sun Aug 7 16:29:55 2011
@@ -28,7 +28,7 @@ import java.io.PrintStream
import kafka.message._
import kafka.utils.Utils
import kafka.utils.ZkUtils
-import kafka.utils.StringSerializer
+import kafka.utils.ZKStringSerializer
/**
* Consumer that dumps messages out to standard out.
@@ -200,7 +200,7 @@ object ConsoleConsumer {
try {
val dir = "/consumers/" + groupId
logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
- val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer)
+ val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
zk.deleteRecursive(dir)
zk.close()
} catch {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Sun Aug 7 16:29:55 2011
@@ -92,8 +92,8 @@ private[kafka] class ZookeeperConsumerCo
// queues : (topic,consumerThreadId) -> queue
private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
- connectZk
- createFetcher
+ connectZk()
+ createFetcher()
if (config.autoCommit) {
logger.info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs)
@@ -112,7 +112,7 @@ private[kafka] class ZookeeperConsumerCo
private def connectZk() {
logger.info("Connecting to zookeeper instance at " + config.zkConnect)
- zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
+ zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
}
def shutdown() {
@@ -120,12 +120,14 @@ private[kafka] class ZookeeperConsumerCo
if (canShutdown) {
logger.info("ZKConsumerConnector shutting down")
try {
- scheduler.shutdown
+ scheduler.shutdown()
fetcher match {
- case Some(f) => f.shutdown
+ case Some(f) => f.shutdown()
case None =>
}
- sendShudownToAllQueues
+ sendShudownToAllQueues()
+ if (config.autoCommit)
+ commitOffsets()
if (zkClient != null) {
zkClient.close()
zkClient = null
@@ -186,7 +188,7 @@ private[kafka] class ZookeeperConsumerCo
new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener))
// explicitly trigger load balancing for this consumer
- loadBalancerListener.syncedRebalance
+ loadBalancerListener.syncedRebalance()
ret
}
@@ -199,7 +201,7 @@ private[kafka] class ZookeeperConsumerCo
private def sendShudownToAllQueues() = {
for (queue <- queues.values) {
logger.debug("Clearing up queue")
- queue.clear
+ queue.clear()
queue.put(ZookeeperConsumerConnector.shutdownCommand)
logger.debug("Cleared queue and sent shutdown command")
}
@@ -209,7 +211,7 @@ private[kafka] class ZookeeperConsumerCo
if(logger.isTraceEnabled)
logger.trace("auto committing")
try {
- commitOffsets
+ commitOffsets()
}
catch {
case t: Throwable =>
@@ -419,7 +421,7 @@ private[kafka] class ZookeeperConsumerCo
logger.info("begin rebalancing consumer " + consumerIdString + " try #" + i)
var done = false
try {
- done = rebalance
+ done = rebalance()
}
catch {
case e =>
@@ -432,8 +434,8 @@ private[kafka] class ZookeeperConsumerCo
if (done)
return
// release all partitions, reset state and retry
- releasePartitionOwnership
- resetState
+ releasePartitionOwnership()
+ resetState()
Thread.sleep(config.zkSyncTimeMs)
}
}
@@ -462,7 +464,7 @@ private[kafka] class ZookeeperConsumerCo
commitOffsets
logger.info("Releasing partition ownership")
- releasePartitionOwnership
+ releasePartitionOwnership()
val queuesToBeCleared = new mutable.HashSet[BlockingQueue[FetchedDataChunk]]
for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala Sun Aug 7 16:29:55 2011
@@ -15,7 +15,7 @@
*/
package kafka.producer
-import kafka.utils.{StringSerializer, ZkUtils, ZKConfig}
+import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig}
import collection.mutable.HashMap
import collection.mutable.Map
import org.apache.log4j.Logger
@@ -59,7 +59,7 @@ private[producer] class ZKBrokerPartitio
private val logger = Logger.getLogger(classOf[ZKBrokerPartitionInfo])
private val zkWatcherLock = new Object
private val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- StringSerializer)
+ ZKStringSerializer)
// maintain a map from topic -> list of (broker, num_partitions) from zookeeper
private var topicBrokerPartitions = getZKTopicPartitionInfo
// maintain a map from broker id to the corresponding Broker object
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Sun Aug 7 16:29:55 2011
@@ -42,7 +42,7 @@ class KafkaZooKeeper(config: KafkaConfig
def startup() {
/* start client */
logger.info("connecting to ZK: " + config.zkConnect)
- zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
+ zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
zkClient.subscribeStateChanges(new SessionExpireListener)
}
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Sun Aug 7 16:29:55 2011
@@ -9,7 +9,7 @@ import kafka.producer.async.DefaultEvent
import kafka.serializer.{DefaultEncoder, StringEncoder}
import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
import kafka.consumer._
-import kafka.utils.{StringSerializer, Utils}
+import kafka.utils.{ZKStringSerializer, Utils}
import kafka.api.OffsetRequest
import org.I0Itec.zkclient._
import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet}
@@ -131,7 +131,7 @@ object ReplayLogProducer {
try {
val dir = "/consumers/" + groupId
logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
- val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer)
+ val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
zk.deleteRecursive(dir)
zk.close()
} catch {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Sun Aug 7 16:29:55 2011
@@ -33,7 +33,7 @@ object UpdateOffsetsInZK {
usage
val config = new ConsumerConfig(Utils.loadProps(args(1)))
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs, StringSerializer)
+ config.zkConnectionTimeoutMs, ZKStringSerializer)
args(0) match {
case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2))
case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2))
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala Sun Aug 7 16:29:55 2011
@@ -239,7 +239,7 @@ object ZkUtils {
}
}
-object StringSerializer extends ZkSerializer {
+object ZKStringSerializer extends ZkSerializer {
@throws(classOf[ZkMarshallingError])
def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")
Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala Sun Aug 7 16:29:55 2011
@@ -17,7 +17,7 @@
package kafka
import consumer.ConsumerConfig
-import utils.{StringSerializer, ZkUtils, Utils}
+import utils.{ZKStringSerializer, ZkUtils, Utils}
import org.I0Itec.zkclient.ZkClient
object DeleteZKPath {
@@ -31,7 +31,7 @@ object DeleteZKPath {
val zkPath = args(1)
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- StringSerializer)
+ ZKStringSerializer)
try {
ZkUtils.deletePathRecursive(zkClient, zkPath);
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Sun Aug 7 16:29:55 2011
@@ -209,7 +209,7 @@ object TestUtils {
* Throw an exception if an iterable has different length than expected
*
*/
- def checkLength[T](s1: Iterator[T], expectedLength:Integer) {
+ def checkLength[T](s1: Iterator[T], expectedLength:Int) {
var n = 0
while (s1.hasNext) {
n+=1
@@ -283,7 +283,7 @@ object TestUtils {
}
def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
- val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
+ val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
}
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Sun Aug 7 16:29:55 2011
@@ -21,7 +21,7 @@ import org.apache.zookeeper.server.NIOSe
import kafka.utils.TestUtils
import org.I0Itec.zkclient.ZkClient
import java.net.InetSocketAddress
-import kafka.utils.{Utils, StringSerializer}
+import kafka.utils.{Utils, ZKStringSerializer}
class EmbeddedZookeeper(val connectString: String) {
val snapshotDir = TestUtils.tempDir()
@@ -31,7 +31,7 @@ class EmbeddedZookeeper(val connectStrin
val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
factory.startup(zookeeper)
val client = new ZkClient(connectString)
- client.setZkSerializer(StringSerializer)
+ client.setZkSerializer(ZKStringSerializer)
def shutdown() {
factory.shutdown()
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala?rev=1154719&r1=1154718&r2=1154719&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala Sun Aug 7 16:29:55 2011
@@ -18,7 +18,7 @@ package kafka.zk
import kafka.consumer.ConsumerConfig
import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZkUtils, StringSerializer}
+import kafka.utils.{ZkUtils, ZKStringSerializer}
import kafka.utils.{TestZKUtils, TestUtils}
import org.junit.Assert
import org.scalatest.junit.JUnit3Suite
@@ -30,7 +30,7 @@ class ZKEphemeralTest extends JUnit3Suit
def testEphemeralNodeCleanup = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- StringSerializer)
+ ZKStringSerializer)
try {
ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
@@ -48,7 +48,7 @@ class ZKEphemeralTest extends JUnit3Suit
Thread.sleep(zkSessionTimeoutMs)
zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- StringSerializer)
+ ZKStringSerializer)
val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest")
Assert.assertFalse(nodeExists)