You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/03 06:57:07 UTC

svn commit: r1178298 - in /incubator/kafka/trunk/core/src/main/scala/kafka/consumer: ConsumerIterator.scala ConsumerTopicStat.scala KafkaMessageStream.scala ZookeeperConsumerConnector.scala

Author: junrao
Date: Mon Oct  3 04:57:06 2011
New Revision: 1178298

URL: http://svn.apache.org/viewvc?rev=1178298&view=rev
Log:
JMX bean that reports #message/sec in consumer; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-136

Added:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1178298&r1=1178297&r2=1178298&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Mon Oct  3 04:57:06 2011
@@ -29,7 +29,8 @@ import kafka.serializer.Decoder
  * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
  * 
  */
-class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
+class ConsumerIterator[T](private val topic: String,
+                          private val channel: BlockingQueue[FetchedDataChunk],
                           consumerTimeoutMs: Int,
                           private val decoder: Decoder[T])
         extends IteratorTemplate[T] {
@@ -47,6 +48,7 @@ class ConsumerIterator[T](private val ch
     currentTopicInfo.resetConsumeOffset(consumedOffset)
     if(logger.isTraceEnabled)
       logger.trace("Setting consumed offset to %d".format(consumedOffset))
+    ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
     decodedMessage
   }
 

Added: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala?rev=1178298&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala Mon Oct  3 04:57:06 2011
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent.atomic.AtomicLong
+import org.apache.log4j.Logger
+import kafka.utils.{Pool, Utils, threadsafe}
+
+trait ConsumerTopicStatMBean {
+  def getMessagesPerTopic: Long
+}
+
+@threadsafe
+class ConsumerTopicStat extends ConsumerTopicStatMBean {
+  private val numCumulatedMessagesPerTopic = new AtomicLong(0)
+
+  def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
+
+  def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
+}
+
+object ConsumerTopicStat {
+  private val logger = Logger.getLogger(getClass())
+  private val stats = new Pool[String, ConsumerTopicStat]
+
+  def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
+    var stat = stats.get(topic)
+    if (stat == null) {
+      stat = new ConsumerTopicStat
+      if (stats.putIfNotExists(topic, stat) == null)
+        Utils.swallow(logger.warn, Utils.registerMBean(stat, "kafka:type=kafka.ConsumerTopicStat." + topic))
+      else
+        stat = stats.get(topic)
+    }
+    return stat
+  }
+}

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala?rev=1178298&r1=1178297&r2=1178298&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala Mon Oct  3 04:57:06 2011
@@ -26,14 +26,15 @@ import kafka.serializer.{DefaultDecoder,
  * All calls to elements should produce the same thread-safe iterator? Should have a seperate thread
  * that feeds messages into a blocking queue for processing.
  */
-class KafkaMessageStream[T](private val queue: BlockingQueue[FetchedDataChunk],
+class KafkaMessageStream[T](val topic: String,
+                            private val queue: BlockingQueue[FetchedDataChunk],
                             consumerTimeoutMs: Int,
                             private val decoder: Decoder[T])
    extends Iterable[T] with java.lang.Iterable[T]{
 
   private val logger = Logger.getLogger(getClass())
   private val iter: ConsumerIterator[T] =
-    new ConsumerIterator[T](queue, consumerTimeoutMs, decoder)
+    new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder)
     
   /**
    *  Create an iterator over messages in the stream.

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1178298&r1=1178297&r2=1178298&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Oct  3 04:57:06 2011
@@ -186,7 +186,7 @@ private[kafka] class ZookeeperConsumerCo
       for (threadId <- threadIdSet) {
         val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
         queues.put((topic, threadId), stream)
-        streamList ::= new KafkaMessageStream[T](stream, config.consumerTimeoutMs, decoder)
+        streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder)
       }
       ret += (topic -> streamList)
       logger.debug("adding topic " + topic + " and stream to map..")