You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/09/20 23:52:43 UTC

[2/2] git commit: SAMZA-25; adding metrics to everything.

SAMZA-25; adding metrics to everything.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/bb0abb67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/bb0abb67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/bb0abb67

Branch: refs/heads/master
Commit: bb0abb670892806d46c64d158cabae8990aceb09
Parents: 819b996
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Fri Sep 20 14:52:21 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Fri Sep 20 14:52:21 2013 -0700

----------------------------------------------------------------------
 build.gradle                                    |  1 +
 .../apache/samza/metrics/MetricsRegistry.java   |  4 ++
 .../apache/samza/util/NoOpMetricsRegistry.java  | 10 ++++
 .../apache/samza/container/SamzaContainer.scala | 32 ++++++++--
 .../samza/container/SamzaContainerMetrics.scala | 15 +++--
 .../apache/samza/container/TaskInstance.scala   | 20 +++++++
 .../samza/container/TaskInstanceMetrics.scala   | 21 +++++--
 .../org/apache/samza/metrics/JvmMetrics.scala   | 32 +++++-----
 .../apache/samza/metrics/MetricsHelper.scala    | 50 ++++++++++++++++
 .../samza/metrics/MetricsRegistryMap.scala      | 33 ++++++++---
 .../apache/samza/system/SystemConsumers.scala   | 25 +++++++-
 .../samza/system/SystemConsumersMetrics.scala   | 61 +++++++++++++++++++
 .../apache/samza/system/SystemProducers.scala   | 24 +++++++-
 .../samza/system/SystemProducersMetrics.scala   | 37 ++++++++++++
 .../samza/metrics/TestMetricsHelper.scala       | 31 ++++++++++
 .../metrics/reporter/TestJmxReporter.scala      |  4 +-
 .../apache/samza/system/kafka/BrokerProxy.scala | 24 ++++----
 .../samza/system/kafka/BrokerProxyMetrics.scala | 52 ----------------
 .../system/kafka/KafkaSystemConsumer.scala      | 10 ++--
 .../kafka/KafkaSystemConsumerMetrics.scala      | 62 ++++++++++++++++++++
 .../samza/system/kafka/KafkaSystemFactory.scala |  8 ++-
 .../system/kafka/KafkaSystemProducer.scala      | 21 ++++---
 .../kafka/KafkaSystemProducerMetrics.scala      | 41 +++++++++++++
 .../system/kafka/TopicAndPartitionMetrics.scala | 56 ------------------
 .../samza/system/kafka/TestBrokerProxy.scala    | 31 ++++------
 .../system/kafka/TestKafkaSystemProducer.scala  | 16 ++---
 .../apache/samza/storage/kv/CachedStore.scala   | 26 ++++++--
 .../samza/storage/kv/CachedStoreMetrics.scala   | 49 ++++++++++++++++
 .../storage/kv/KeyValueStorageEngine.scala      | 45 +++++++++++---
 .../kv/KeyValueStorageEngineFactory.scala       | 15 +++--
 .../kv/KeyValueStorageEngineMetrics.scala       | 42 +++++++++++++
 .../samza/storage/kv/LevelDbKeyValueStore.scala | 49 ++++++++++++----
 .../kv/LevelDbKeyValueStoreMetrics.scala        | 41 +++++++++++++
 .../apache/samza/storage/kv/LoggedStore.scala   | 48 ++++++++++-----
 .../samza/storage/kv/LoggedStoreMetrics.scala   | 39 ++++++++++++
 .../storage/kv/SerializedKeyValueStore.scala    | 38 ++++++++++--
 .../kv/SerializedKeyValueStoreMetrics.scala     | 41 +++++++++++++
 .../samza/job/yarn/SamzaAppMasterMetrics.scala  | 32 +++++-----
 38 files changed, 908 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4c4e99c..077e44b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -154,6 +154,7 @@ project(":samza-kv_$scalaVersion") {
 
   dependencies {
     compile project(':samza-api')
+    compile project(":samza-core_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaVersion"
     compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"
     compile "org.fusesource.leveldbjni:leveldbjni-all:$leveldbVersion"

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
index a4563c4..9df1ef6 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
@@ -22,5 +22,9 @@ package org.apache.samza.metrics;
 public interface MetricsRegistry {
   Counter newCounter(String group, String name);
 
+  Counter newCounter(String group, Counter counter);
+
   <T> Gauge<T> newGauge(String group, String name, T value);
+
+  <T> Gauge<T> newGauge(String group, Gauge<T> value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
index c071c47..8bc0764 100644
--- a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
@@ -30,7 +30,17 @@ public class NoOpMetricsRegistry implements MetricsRegistry {
   }
 
   @Override
+  public Counter newCounter(String group, Counter counter) {
+    return counter;
+  }
+
+  @Override
   public <T> Gauge<T> newGauge(String group, String name, T value) {
     return new Gauge<T>(name, value);
   }
+
+  @Override
+  public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
+    return gauge;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 2d2efdd..62bd243 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -55,6 +55,9 @@ import org.apache.samza.system.SystemProducers
 import org.apache.samza.task.ReadableCollector
 import org.apache.samza.system.DefaultChooser
 import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemProducersMetrics
+import org.apache.samza.system.SystemConsumersMetrics
+import org.apache.samza.metrics.MetricsRegistryMap
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
@@ -83,7 +86,10 @@ object SamzaContainer extends Logging {
     info("Using partitions: %s" format partitions)
     info("Using configuration: %s" format config)
 
-    val samzaContainerMetrics = new SamzaContainerMetrics(containerName)
+    val registry = new MetricsRegistryMap(containerName)
+    val samzaContainerMetrics = new SamzaContainerMetrics(containerName, registry)
+    val systemProducersMetrics = new SystemProducersMetrics(registry)
+    val systemConsumersMetrics = new SystemConsumersMetrics(registry)
 
     val inputStreams = config.getInputStreams
     val inputSystems = inputStreams.map(_.getSystem)
@@ -261,11 +267,13 @@ object SamzaContainer extends Logging {
       // TODO add config values for no new message timeout and max msgs per stream partition
       chooser = chooser,
       consumers = consumers,
-      serdeManager = serdeManager)
+      serdeManager = serdeManager,
+      metrics = systemConsumersMetrics)
 
     val producerMultiplexer = new SystemProducers(
       producers = producers,
-      serdeManager = serdeManager)
+      serdeManager = serdeManager,
+      metrics = systemProducersMetrics)
 
     val listeners = config.getLifecycleListeners match {
       case Some(listeners) => {
@@ -324,7 +332,7 @@ object SamzaContainer extends Logging {
 
       val collector = new ReadableCollector
 
-      val taskInstanceMetrics = new TaskInstanceMetrics(partition)
+      val taskInstanceMetrics = new TaskInstanceMetrics("Partition-%s" format partition.getPartitionId)
 
       val storeConsumers = changeLogSystemStreams
         .map {
@@ -402,8 +410,8 @@ object SamzaContainer extends Logging {
       config = config,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      checkpointManager = checkpointManager,
       metrics = samzaContainerMetrics,
+      checkpointManager = checkpointManager,
       reporters = reporters,
       jvm = jvm)
   }
@@ -414,8 +422,8 @@ class SamzaContainer(
   config: Config,
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
+  metrics: SamzaContainerMetrics,
   checkpointManager: CheckpointManager = null,
-  metrics: SamzaContainerMetrics = new SamzaContainerMetrics,
   reporters: Map[String, MetricsReporter] = Map(),
   jvm: JvmMetrics = null) extends Runnable with Logging {
 
@@ -530,6 +538,8 @@ class SamzaContainer(
   def process(coordinator: ReadableCoordinator) {
     trace("Attempting to choose a message to process.")
 
+    metrics.processes.inc
+
     val envelope = consumerMultiplexer.choose
 
     if (envelope != null) {
@@ -537,27 +547,37 @@ class SamzaContainer(
 
       trace("Processing incoming message envelope for partition %s." format partition)
 
+      metrics.envelopes.inc
+
       taskInstances(partition).process(envelope, coordinator)
     } else {
       trace("No incoming message envelope was available.")
+
+      metrics.nullEnvelopes.inc
     }
   }
 
   def window(coordinator: ReadableCoordinator) {
     trace("Windowing stream tasks.")
 
+    metrics.windows.inc
+
     taskInstances.values.foreach(_.window(coordinator))
   }
 
   def send {
     trace("Triggering send in task instances.")
 
+    metrics.sends.inc
+
     taskInstances.values.foreach(_.send)
   }
 
   def commit(coordinator: ReadableCoordinator) {
     trace("Committing task instances.")
 
+    metrics.commits.inc
+
     taskInstances.values.foreach(_.commit(coordinator))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 81cf356..bcb3fa3 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -21,13 +21,16 @@ package org.apache.samza.container
 
 import org.apache.samza.metrics.ReadableMetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.MetricsHelper
 
 class SamzaContainerMetrics(
-  val containerName: String = "unnamed-container",
-  val registry: ReadableMetricsRegistry = new MetricsRegistryMap) {
+  val source: String = "unknown",
+  val registry: ReadableMetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
 
-  val source = containerName
-  val commits = registry.newCounter("samza.task.SamzaContainer", "commits")
-
-  // TODO .. etc ..
+  val commits = newCounter("commit-calls")
+  val windows = newCounter("window-calls")
+  val processes = newCounter("process-calls")
+  val sends = newCounter("send-calls")
+  val envelopes = newCounter("process-envelopes")
+  val nullEnvelopes = newCounter("process-null-envelopes")
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index c61994d..d7eff62 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -42,6 +42,7 @@ import org.apache.samza.task.ReadableCollector
 import org.apache.samza.system.SystemConsumers
 import org.apache.samza.system.SystemProducers
 import org.apache.samza.task.ReadableCoordinator
+import org.apache.samza.metrics.Gauge
 
 class TaskInstance(
   task: StreamTask,
@@ -136,6 +137,8 @@ class TaskInstance(
         for ((systemStream, offset) <- checkpoint.getOffsets) {
           if (!resetInputStreams.getOrElse(systemStream, false)) {
             offsets += systemStream -> offset
+
+            metrics.addOffsetGauge(systemStream, () => offsets(systemStream))
           } else {
             info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStream))
           }
@@ -156,6 +159,8 @@ class TaskInstance(
   }
 
   def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) {
+    metrics.processes.inc
+
     listeners.foreach(_.beforeProcess(envelope, config, context))
 
     trace("Processing incoming message envelope for partition: %s, %s" format (partition, envelope.getSystemStreamPartition))
@@ -173,12 +178,16 @@ class TaskInstance(
     if (isWindowableTask && windowMs >= 0 && lastWindowMs + windowMs < clock()) {
       trace("Windowing for partition: %s" format partition)
 
+      metrics.windows.inc
+
       task.asInstanceOf[WindowableTask].window(collector, coordinator)
       lastWindowMs = clock()
 
       trace("Assigned last window time for partition: %s, %s" format (partition, lastWindowMs))
     } else {
       trace("Skipping window for partition: %s" format partition)
+
+      metrics.windowsSkipped.inc
     }
   }
 
@@ -186,6 +195,9 @@ class TaskInstance(
     if (collector.envelopes.size > 0) {
       trace("Sending messages for partition: %s, %s" format (partition, collector.envelopes.size))
 
+      metrics.sends.inc
+      metrics.messagesSent.inc(collector.envelopes.size)
+
       collector.envelopes.foreach(envelope => producerMultiplexer.send(metrics.source, envelope))
 
       trace("Resetting collector for partition: %s" format partition)
@@ -193,6 +205,8 @@ class TaskInstance(
       collector.reset
     } else {
       trace("Skipping send for partition %s because no messages were collected." format partition)
+
+      metrics.sendsSkipped.inc
     }
   }
 
@@ -200,6 +214,8 @@ class TaskInstance(
     if (lastCommitMs + commitMs < clock() || coordinator.isCommitRequested || coordinator.isShutdownRequested) {
       trace("Flushing state stores for partition: %s" format partition)
 
+      metrics.commits.inc
+
       storageManager.flush
 
       trace("Flushing producers for partition: %s" format partition)
@@ -213,6 +229,10 @@ class TaskInstance(
       }
 
       lastCommitMs = clock()
+    } else {
+      trace("Skipping commit for partition: %s" format partition)
+
+      metrics.commitsSkipped.inc
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
index 07d72c7..46f1f17 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -22,13 +22,24 @@ package org.apache.samza.container
 import org.apache.samza.metrics.ReadableMetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.Partition
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.system.SystemStream
+import org.apache.samza.metrics.Gauge
 
 class TaskInstanceMetrics(
-  val partition: Partition,
-  val registry: ReadableMetricsRegistry = new MetricsRegistryMap) {
+  val source: String = "unknown",
+  val registry: ReadableMetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
 
-  val source = "Partition-%s" format partition.getPartitionId
-  val commits = registry.newCounter("samza.task.TaskInstance", "commits")
+  val commits = newCounter("commit-calls")
+  val commitsSkipped = newCounter("commit-skipped")
+  val windows = newCounter("window-calls")
+  val windowsSkipped = newCounter("window-skipped")
+  val processes = newCounter("process-calls")
+  val sends = newCounter("send-calls")
+  val sendsSkipped = newCounter("send-skipped")
+  val messagesSent = newCounter("messages-sent")
 
-  // TODO .. etc ..
+  def addOffsetGauge(systemStream: SystemStream, getValue: () => String) {
+    newGauge("%s-%s-offset" format (systemStream.getSystem, systemStream.getStream), getValue)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
index 164a2ee..301a5a0 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
@@ -32,10 +32,8 @@ import org.apache.samza.util.DaemonThreadFactory
 /**
  * Straight up ripoff of Hadoop's metrics2 JvmMetrics class.
  */
-class JvmMetrics(group: String, registry: MetricsRegistry) extends Runnable with Logging {
-  final val M = 1024 * 1024.0f
-
-  def this(registry: MetricsRegistry) = this("samza.jvm", registry)
+class JvmMetrics(val registry: MetricsRegistry) extends MetricsHelper with Runnable with Logging {
+  final val M = 1024 * 1024.0F
 
   val memoryMXBean = ManagementFactory.getMemoryMXBean()
   val gcBeans = ManagementFactory.getGarbageCollectorMXBeans()
@@ -44,18 +42,18 @@ class JvmMetrics(group: String, registry: MetricsRegistry) extends Runnable with
   val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory)
 
   // jvm metrics
-  val gMemNonHeapUsedM = registry.newGauge[Float](group, "MemNonHeapUsedM", 0)
-  val gMemNonHeapCommittedM = registry.newGauge[Float](group, "MemNonHeapCommittedM", 0)
-  val gMemHeapUsedM = registry.newGauge[Float](group, "MemHeapUsedM", 0)
-  val gMemHeapCommittedM = registry.newGauge[Float](group, "MemHeapCommittedM", 0)
-  val gThreadsNew = registry.newGauge[Long](group, "ThreadsNew", 0)
-  val gThreadsRunnable = registry.newGauge[Long](group, "ThreadsRunnable", 0)
-  val gThreadsBlocked = registry.newGauge[Long](group, "ThreadsBlocked", 0)
-  val gThreadsWaiting = registry.newGauge[Long](group, "ThreadsWaiting", 0)
-  val gThreadsTimedWaiting = registry.newGauge[Long](group, "ThreadsTimedWaiting", 0)
-  val gThreadsTerminated = registry.newGauge[Long](group, "ThreadsTerminated", 0)
-  val cGcCount = registry.newCounter(group, "GcCount")
-  val cGcTimeMillis = registry.newCounter(group, "GcTimeMillis")
+  val gMemNonHeapUsedM = newGauge("mem-non-heap-used-mb", 0.0F)
+  val gMemNonHeapCommittedM = newGauge("mem-non-heap-committed-mb", 0.0F)
+  val gMemHeapUsedM = newGauge("mem-heap-used-mb", 0.0F)
+  val gMemHeapCommittedM = newGauge("mem-heap-committed-mb", 0.0F)
+  val gThreadsNew = newGauge("threads-new", 0L)
+  val gThreadsRunnable = newGauge("threads-runnable", 0L)
+  val gThreadsBlocked = newGauge("threads-blocked", 0L)
+  val gThreadsWaiting = newGauge("threads-waiting", 0L)
+  val gThreadsTimedWaiting = newGauge("threads-timed-waiting", 0L)
+  val gThreadsTerminated = newGauge("threads-terminated", 0L)
+  val cGcCount = newCounter("gc-count")
+  val cGcTimeMillis = newCounter("gc-time-millis")
 
   def start {
     executor.scheduleWithFixedDelay(this, 0, 5, TimeUnit.SECONDS)
@@ -107,7 +105,7 @@ class JvmMetrics(group: String, registry: MetricsRegistry) extends Runnable with
     gcBeanCounters.get(gcName) match {
       case Some(gcBeanCounterTuple) => gcBeanCounterTuple
       case _ => {
-        val t = (registry.newCounter(group, "GcCount" + gcName), registry.newCounter(group, "GcTimeMillis" + gcName))
+        val t = (newCounter("%s-gc-count" format gcName), newCounter("%s-gc-time-millis" format gcName))
         gcBeanCounters += (gcName -> t)
         t
       }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
new file mode 100644
index 0000000..b412e46
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+
+import org.apache.samza.container.SamzaContainerMetrics
+
+/**
+ * MetricsHelper is a little helper class to make it easy to register and
+ * manage counters and gauges.
+ */
+trait MetricsHelper {
+  val group = this.getClass.getName
+  val registry: MetricsRegistry
+
+  def newCounter(name: String) = {
+    registry.newCounter(group, (getPrefix + name).toLowerCase)
+  }
+
+  def newGauge[T](name: String, value: T) = {
+    registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value))
+  }
+
+  def newGauge[T](name: String, value: () => T) = {
+    registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value()) {
+      override def getValue = value()
+    })
+  }
+
+  /**
+   * Returns a prefix for metric names.
+   */
+  def getPrefix = ""
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
index fc0bd38..da83f20 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
@@ -18,6 +18,7 @@
  */
 
 package org.apache.samza.metrics
+
 import grizzled.slf4j.Logging
 import java.util.concurrent.ConcurrentHashMap
 
@@ -25,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap
  * A class that holds all metrics registered with it. It can be registered
  * with one or more MetricReporters to flush metrics.
  */
-class MetricsRegistryMap extends ReadableMetricsRegistry with Logging {
+class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with Logging {
   var listeners = Set[ReadableMetricsRegistryListener]()
 
   /*
@@ -33,20 +34,32 @@ class MetricsRegistryMap extends ReadableMetricsRegistry with Logging {
    */
   val metrics = new ConcurrentHashMap[String, ConcurrentHashMap[String, Metric]]
 
+  def this() = this("unknown")
+
+  def newCounter(group: String, counter: Counter) = {
+    debug("Add new counter %s %s %s." format (group, counter.getName, counter))
+    putAndGetGroup(group).putIfAbsent(counter.getName, counter)
+    val realCounter = metrics.get(group).get(counter.getName).asInstanceOf[Counter]
+    listeners.foreach(_.onCounter(group, realCounter))
+    realCounter
+  }
+
   def newCounter(group: String, name: String) = {
     debug("Creating new counter %s %s." format (group, name))
-    putAndGetGroup(group).putIfAbsent(name, new Counter(name))
-    val counter = metrics.get(group).get(name).asInstanceOf[Counter]
-    listeners.foreach(_.onCounter(group, counter))
-    counter
+    newCounter(group, new Counter(name))
+  }
+
+  def newGauge[T](group: String, gauge: Gauge[T]) = {
+    debug("Adding new gauge %s %s %s." format (group, gauge.getName, gauge))
+    putAndGetGroup(group).putIfAbsent(gauge.getName, gauge)
+    val realGauge = metrics.get(group).get(gauge.getName).asInstanceOf[Gauge[T]]
+    listeners.foreach(_.onGauge(group, realGauge))
+    realGauge
   }
 
   def newGauge[T](group: String, name: String, value: T) = {
     debug("Creating new gauge %s %s %s." format (group, name, value))
-    putAndGetGroup(group).putIfAbsent(name, new Gauge[T](name, value))
-    val gauge = metrics.get(group).get(name).asInstanceOf[Gauge[T]]
-    listeners.foreach(_.onGauge(group, gauge))
-    gauge
+    newGauge(group, new Gauge[T](name, value))
   }
 
   private def putAndGetGroup(group: String) = {
@@ -54,6 +67,8 @@ class MetricsRegistryMap extends ReadableMetricsRegistry with Logging {
     metrics.get(group)
   }
 
+  def getName = name
+
   def getGroups = metrics.keySet()
 
   def getGroup(group: String) = metrics.get(group)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index b18f0cc..d24671e 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -21,18 +21,19 @@ package org.apache.samza.system
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.Queue
-import grizzled.slf4j.Logging
+
 import org.apache.samza.serializers.SerdeManager
 
+import grizzled.slf4j.Logging
+
 class SystemConsumers(
   chooser: MessageChooser,
   consumers: Map[String, SystemConsumer],
   serdeManager: SerdeManager,
+  metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
   maxMsgsPerStreamPartition: Int = 1000,
   noNewMessagesTimeout: Long = 10) extends Logging {
 
-  // TODO add metrics
-
   var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
   var neededByChooser = Set[SystemStreamPartition]()
   var fetchMap = Map[SystemStreamPartition, java.lang.Integer]()
@@ -42,6 +43,12 @@ class SystemConsumers(
   debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition)
   debug("Got no new message timeout: %s" format noNewMessagesTimeout)
 
+  metrics.setUnprocessedMessages(() => fetchMap.values.map(maxMsgsPerStreamPartition - _.intValue).sum)
+  metrics.setNeededByChooser(() => neededByChooser.size)
+  metrics.setTimeout(() => timeout)
+  metrics.setMaxMessagesPerStreamPartition(() => maxMsgsPerStreamPartition)
+  metrics.setNoNewMessagesTimeout(() => noNewMessagesTimeout)
+
   def start {
     debug("Starting consumers.")
 
@@ -61,6 +68,8 @@ class SystemConsumers(
     fetchMap += systemStreamPartition -> maxMsgsPerStreamPartition
     unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
     consumers(systemStreamPartition.getSystem).register(systemStreamPartition, lastReadOffset)
+
+    metrics.registerSystem(systemStreamPartition.getSystem)
   }
 
   def choose = {
@@ -69,11 +78,15 @@ class SystemConsumers(
     if (envelopeFromChooser == null) {
       debug("Chooser returned null.")
 
+      metrics.choseNull.inc
+
       // Allow blocking if the chooser didn't choose a message.
       timeout = noNewMessagesTimeout
     } else {
       debug("Chooser returned an incoming message envelope: %s" format envelopeFromChooser)
 
+      metrics.choseObject.inc
+
       // Don't block if we have a message to process.
       timeout = 0
 
@@ -104,6 +117,8 @@ class SystemConsumers(
   private def poll(systemName: String) = {
     debug("Polling system consumer: %s" format systemName)
 
+    metrics.systemPolls(systemName).inc
+
     val consumer = consumers(systemName)
 
     debug("Filtering for system: %s, %s" format (systemName, fetchMap))
@@ -112,10 +127,14 @@ class SystemConsumers(
 
     debug("Fetching: %s" format systemFetchMap)
 
+    metrics.systemStreamPartitionFetchesPerPoll(systemName).inc(systemFetchMap.size)
+
     val incomingEnvelopes = consumer.poll(systemFetchMap, timeout)
 
     debug("Got incoming message envelopes: %s" format incomingEnvelopes)
 
+    metrics.systemMessagesPerPoll(systemName).inc
+
     // We have new un-processed envelopes, so update maps accordingly.
     incomingEnvelopes.foreach(envelope => {
       val systemStreamPartition = envelope.getSystemStreamPartition

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
new file mode 100644
index 0000000..9b3160d
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.MetricsHelper
+
+class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+  val choseNull = newCounter("chose-null")
+  val choseObject = newCounter("chose-object")
+  val systemPolls = scala.collection.mutable.Map[String, Counter]()
+  val systemStreamPartitionFetchesPerPoll = scala.collection.mutable.Map[String, Counter]()
+  val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]()
+
+  def setUnprocessedMessages(getValue: () => Int) {
+    newGauge("unprocessed-messages", getValue)
+  }
+
+  def setNeededByChooser(getValue: () => Int) {
+    newGauge("ssps-needed-by-chooser", getValue)
+  }
+
+  def setTimeout(getValue: () => Long) {
+    newGauge("poll-timeout", getValue)
+  }
+
+  def setMaxMessagesPerStreamPartition(getValue: () => Int) {
+    newGauge("max-buffered-messages-per-stream-partition", getValue)
+  }
+
+  def setNoNewMessagesTimeout(getValue: () => Long) {
+    newGauge("blocking-poll-timeout", getValue)
+  }
+
+  def registerSystem(systemName: String) {
+    if (!systemPolls.contains(systemName)) {
+      systemPolls += systemName -> newCounter("%s-polls" format systemName)
+      systemStreamPartitionFetchesPerPoll += systemName -> newCounter("%s-ssp-fetches-per-poll" format systemName)
+      systemMessagesPerPoll += systemName -> newCounter("%s-messages-per-poll" format systemName)
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
index 099c0bf..8fb36b3 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
@@ -20,30 +20,48 @@
 package org.apache.samza.system
 
 import org.apache.samza.serializers.SerdeManager
+import grizzled.slf4j.Logging
 
 class SystemProducers(
   producers: Map[String, SystemProducer],
-  serdeManager: SerdeManager) {
-
-  // TODO add metrics and logging
+  serdeManager: SerdeManager,
+  metrics: SystemProducersMetrics = new SystemProducersMetrics) extends Logging {
 
   def start {
+    debug("Starting producers.")
+
     producers.values.foreach(_.start)
   }
 
   def stop {
+    debug("Stopping producers.")
+
     producers.values.foreach(_.stop)
   }
 
   def register(source: String) {
+    debug("Registering source: %s" format source)
+
+    metrics.registerSource(source)
+
     producers.values.foreach(_.register(source))
   }
 
   def flush(source: String) {
+    debug("Flushing source: %s" format source)
+
+    metrics.flushes.inc
+    metrics.sourceFlushes(source).inc
+
     producers.values.foreach(_.flush(source))
   }
 
   def send(source: String, envelope: OutgoingMessageEnvelope) {
+    trace("Sending message from source: %s, %s" format (envelope, source))
+
+    metrics.sends.inc
+    metrics.sourceSends(source).inc
+
     producers(envelope.getSystemStream.getSystem).send(source, serdeManager.toBytes(envelope))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
new file mode 100644
index 0000000..594dd51
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.Counter
+
+class SystemProducersMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+  val flushes = newCounter("flushes")
+  val sends = newCounter("sends")
+  val sourceFlushes = scala.collection.mutable.Map[String, Counter]()
+  val sourceSends = scala.collection.mutable.Map[String, Counter]()
+
+  def registerSource(source: String) {
+    sourceFlushes += source -> newCounter("%s-flushes" format source)
+    sourceSends += source -> newCounter("%s-sends" format source)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
new file mode 100644
index 0000000..0a62bd0
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.samza.container.SamzaContainerMetrics
+
+class TestMetricsHelper {
+  @Test
+  def testMetricsHelperGroupShouldBePAckageName {
+    assertEquals(classOf[SamzaContainerMetrics].getName, new SamzaContainerMetrics().group)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
index 8827697..357b290 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
@@ -64,7 +64,7 @@ class TestJmxReporter {
   @Test
   def testJmxReporter {
     val registry = new MetricsRegistryMap
-    val jvm = new JvmMetrics("test", registry)
+    val jvm = new JvmMetrics(registry)
     val reporter = new JmxReporterFactory().getMetricsReporter("", "", new MapConfig(Map[String, String]()))
 
     reporter.register("test", registry)
@@ -72,7 +72,7 @@ class TestJmxReporter {
     jvm.run
 
     val mbserver = JMXConnectorFactory.connect(url).getMBeanServerConnection
-    val stateViaJMX = mbserver.getAttribute(new ObjectName("test:type=test,name=MemNonHeapUsedM"), "Value").asInstanceOf[Float]
+    val stateViaJMX = mbserver.getAttribute(new ObjectName("org.apache.samza.metrics.JvmMetrics:type=test,name=mem-non-heap-used-mb"), "Value").asInstanceOf[Float]
 
     assertTrue(stateViaJMX > 0)
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 214de92..f4f616e 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -45,11 +45,10 @@ abstract class BrokerProxy(
   val port: Int,
   val system: String,
   val clientID: String,
-  val metricsRegistry: MetricsRegistry,
-  tpMetrics: TopicAndPartitionMetrics,
+  val metrics: KafkaSystemConsumerMetrics,
   val timeout: Int = Int.MaxValue,
   val bufferSize: Int = 1024000,
-  offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging with BrokerProxyMetrics {
+  offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging {
 
   val messageSink: MessageSink
 
@@ -70,6 +69,8 @@ abstract class BrokerProxy(
 
   var simpleConsumer = createSimpleConsumer()
 
+  metrics.registerBrokerProxy(host, port)
+
   def createSimpleConsumer() = {
     val hostString = "%s:%d" format (host, port)
     info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system))
@@ -88,13 +89,13 @@ abstract class BrokerProxy(
     val offset = offsetGetter.getNextOffset(simpleConsumer, tp, lastCheckpointedOffset)
     nextOffsets += tp -> offset
 
-    tpGaugeInc
+    metrics.topicPartitions(host, port).set(nextOffsets.size)
   }
 
   def removeTopicPartition(tp: TopicAndPartition) = {
     if (nextOffsets.containsKey(tp)) {
       nextOffsets.remove(tp)
-      tpGaugeDec
+      metrics.topicPartitions(host, port).set(nextOffsets.size)
       debug("Removed %s" format tp)
     } else {
       warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys().mkString(",")))
@@ -125,7 +126,7 @@ abstract class BrokerProxy(
               debug("Stack trace for fetchMessages exception.", e)
               simpleConsumer.close()
               simpleConsumer = createSimpleConsumer()
-              reconnectCounter.inc
+              metrics.reconnects(host, port).inc
             }
           }
         }
@@ -135,6 +136,7 @@ abstract class BrokerProxy(
   }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
 
   private def fetchMessages(): Unit = {
+    metrics.brokerReads(host, port).inc
     val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList: _*)
     firstCall = false
     firstCallBarrier.countDown()
@@ -177,9 +179,11 @@ abstract class BrokerProxy(
 
         nextOffset = message.nextOffset
 
-        tpMetrics.getReadsCounter(tp).inc
-        tpMetrics.getBytesReadCounter(tp).inc(message.message.payloadSize + message.message.keySize)
-        tpMetrics.getOffsetCounter(tp).set(nextOffset)
+        val bytesSize = message.message.payloadSize + message.message.keySize
+        metrics.reads(tp).inc
+        metrics.bytesRead(tp).inc(bytesSize)
+        metrics.brokerBytesRead(host, port).inc(bytesSize)
+        metrics.offsets(tp).set(nextOffset)
       }
 
       nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching.
@@ -187,7 +191,7 @@ abstract class BrokerProxy(
       // Update high water mark
       val hw = data.hw
       if (hw >= 0) {
-        getLagGauge(tp).set(hw - nextOffset)
+        metrics.lag(tp).set(hw - nextOffset)
       } else {
         debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp))
       }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala
deleted file mode 100644
index bdd91da..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka
-
-import org.apache.samza.metrics.{Gauge, Counter, MetricsRegistry}
-import collection.mutable
-import kafka.common.TopicAndPartition
-
-private[kafka] trait BrokerProxyMetrics {
-  self:BrokerProxy =>
-  // TODO: Move topic-partition specific metrics out of brokerproxy, into system
-  val metricsRegistry:MetricsRegistry
-
-  def newCounter = metricsRegistry.newCounter(metricsGroup, _:String)
-
-  val hostPort = host + ":"  + port
-  val metricsGroup = "samza.kafka.brokerproxy"
-
-  // Counters
-  val reconnectCounter = newCounter("%s-Reconnects" format hostPort)
-
-
-
-  // Gauges
-  val lagGauges = mutable.Map[TopicAndPartition, Gauge[Long]]()
-  def getLagGauge(tp:TopicAndPartition) = lagGauges.getOrElseUpdate(tp, metricsRegistry.newGauge[Long](metricsGroup, "%s-MessagesBehindHighWaterMark" format tp, 0l))
-
-  val tpGauge = metricsRegistry.newGauge[Long](metricsGroup, "%s-NumberOfTopicsPartitions" format hostPort, 0)
-
-  def tpGaugeInc = tpGauge.set(tpGauge.getValue + 1l)
-
-  def tpGaugeDec = tpGauge.set(tpGauge.getValue - 1l)
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 7970ffc..b8e17ce 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -54,7 +54,7 @@ object KafkaSystemConsumer {
 private[kafka] class KafkaSystemConsumer(
   systemName: String,
   brokerListString: String,
-  metricsRegistry: MetricsRegistry,
+  metrics: KafkaSystemConsumerMetrics,
   clientId: String = "undefined-client-id-" + UUID.randomUUID.toString,
   timeout: Int = Int.MaxValue,
   bufferSize: Int = 1024000,
@@ -63,20 +63,18 @@ private[kafka] class KafkaSystemConsumer(
   offsetGetter: GetOffset = new GetOffset("fail"),
   deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
   keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
-  clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metricsRegistry, new Clock {
+  clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metrics.registry, new Clock {
   def currentTimeMillis = clock()
 }) with Toss with Logging {
 
   type HostPort = (String, Int)
   val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
   var lastReadOffsets = Map[SystemStreamPartition, String]()
-  val topicAndPartitionMetrics = new TopicAndPartitionMetrics(metricsRegistry)
 
   def start() {
     val topicPartitionsAndOffsets = lastReadOffsets.map {
       case (systemStreamPartition, offset) =>
         val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
-        topicAndPartitionMetrics.addNewTopicAndPartition(topicAndPartition)
         (topicAndPartition, offset)
     }
 
@@ -89,6 +87,8 @@ private[kafka] class KafkaSystemConsumer(
     super.register(systemStreamPartition, lastReadOffset)
 
     lastReadOffsets += systemStreamPartition -> lastReadOffset
+
+    metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition))
   }
 
   def stop() {
@@ -122,7 +122,7 @@ private[kafka] class KafkaSystemConsumer(
 
             brokerOption match {
               case Some(broker) =>
-                val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, metricsRegistry, topicAndPartitionMetrics, timeout, bufferSize, offsetGetter) {
+                val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, timeout, bufferSize, offsetGetter) {
                   val messageSink: MessageSink = sink
                 })
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
new file mode 100644
index 0000000..1012d58
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.MetricsRegistry
+import java.util.concurrent.ConcurrentHashMap
+import kafka.common.TopicAndPartition
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.Gauge
+
+class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+  val offsets = new ConcurrentHashMap[TopicAndPartition, Counter]
+  val bytesRead = new ConcurrentHashMap[TopicAndPartition, Counter]
+  val reads = new ConcurrentHashMap[TopicAndPartition, Counter]
+  val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
+
+  /*
+   * (String, Int) = (host, port) of BrokerProxy.
+   */
+
+  val reconnects = new ConcurrentHashMap[(String, Int), Counter]
+  val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter]
+  val brokerReads = new ConcurrentHashMap[(String, Int), Counter]
+  val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]]
+
+  def registerTopicAndPartition(tp: TopicAndPartition) = {
+    if (!offsets.contains(tp)) {
+      offsets.put(tp, newCounter("%s-%s-offset-change" format (tp.topic, tp.partition)))
+      bytesRead.put(tp, newCounter("%s-%s-bytes-read" format (tp.topic, tp.partition)))
+      reads.put(tp, newCounter("%s-%s-reads" format (tp.topic, tp.partition)))
+      lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format (tp.topic, tp.partition), 0L))
+    }
+  }
+
+  def registerBrokerProxy(host: String, port: Int) {
+    reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, port)))
+    brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format (host, port)))
+    brokerReads.put((host, port), newCounter("%s-%s-reads" format (host, port)))
+    topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format (host, port), 0))
+  }
+
+  override def getPrefix = systemName + "-"
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index fe96dd8..ba08af8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -41,6 +41,7 @@ import org.apache.samza.system.SystemFactory
 class KafkaSystemFactory extends SystemFactory {
   def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
     val clientId = KafkaUtil.getClientId("samza-consumer", config)
+    val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
 
     // Kind of goofy to need a producer config for consumers, but we need metadata.
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
@@ -67,7 +68,7 @@ class KafkaSystemFactory extends SystemFactory {
     new KafkaSystemConsumer(
       systemName = systemName,
       brokerListString = brokerListString,
-      metricsRegistry = registry,
+      metrics = metrics,
       clientId = clientId,
       timeout = timeout,
       bufferSize = bufferSize,
@@ -83,6 +84,7 @@ class KafkaSystemFactory extends SystemFactory {
     val reconnectIntervalMs = Option(producerConfig.retryBackoffMs)
       .getOrElse(10000)
     val getProducer = () => { new Producer[Object, Object](producerConfig) }
+    val metrics = new KafkaSystemProducerMetrics(systemName, registry)
 
     // Unlike consumer, no need to use encoders here, since they come for free 
     // inside the producer configs. Kafka's producer will handle all of this 
@@ -92,8 +94,8 @@ class KafkaSystemFactory extends SystemFactory {
       systemName,
       batchSize,
       reconnectIntervalMs,
-      registry,
-      getProducer)
+      getProducer,
+      metrics)
   }
 
   def getAdmin(systemName: String, config: Config) = {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index e35227f..a419783 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -27,23 +27,17 @@ import kafka.producer.KeyedMessage
 import kafka.producer.Producer
 import kafka.producer.ProducerConfig
 import org.apache.samza.config.Config
-import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.util.KafkaUtil
 import org.apache.samza.system.SystemProducer
 import org.apache.samza.system.OutgoingMessageEnvelope
 
-object KafkaSystemProducerMetrics {
-  val metricsGroup = "samza.kafka.producer"
-}
-
 class KafkaSystemProducer(
   systemName: String,
   batchSize: Int,
   reconnectIntervalMs: Long,
-  registry: MetricsRegistry,
-  getProducer: () => Producer[Object, Object]) extends SystemProducer with Logging {
+  getProducer: () => Producer[Object, Object],
+  metrics: KafkaSystemProducerMetrics) extends SystemProducer with Logging {
 
-  val flushReconnectCounter = registry.newCounter(KafkaSystemProducerMetrics.metricsGroup, "Producer-%s-Reconnects" format systemName)
   var sourceBuffers = Map[String, ArrayBuffer[KeyedMessage[Object, Object]]]()
   var producer: Producer[Object, Object] = null
 
@@ -58,9 +52,15 @@ class KafkaSystemProducer(
 
   def register(source: String) {
     sourceBuffers += source -> ArrayBuffer()
+
+    metrics.setBufferSize(source, () => sourceBuffers(source).size)
   }
 
   def send(source: String, envelope: OutgoingMessageEnvelope) {
+    debug("Enqueueing message: %s, %s." format (source, envelope))
+
+    metrics.sends.inc
+
     sourceBuffers(source) += new KeyedMessage[Object, Object](
       envelope.getSystemStream.getStream,
       envelope.getKey,
@@ -78,6 +78,8 @@ class KafkaSystemProducer(
 
     debug("Flushing buffer with size: %s." format buffer.size)
 
+    metrics.flushes.inc
+
     while (!done) {
       try {
         if (producer == null) {
@@ -88,12 +90,13 @@ class KafkaSystemProducer(
 
         producer.send(buffer: _*)
         done = true
+        metrics.flushSizes.inc(buffer.size)
       } catch {
         case e: Throwable =>
           warn("Triggering a reconnect for %s because connection failed: %s" format (systemName, e.getMessage))
           debug("Exception while producing to %s." format systemName, e)
 
-          flushReconnectCounter.inc
+          metrics.reconnects.inc
 
           if (producer != null) {
             producer.close

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
new file mode 100644
index 0000000..ad39157
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.system.SystemStream
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.Partition
+import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics.MetricsRegistry
+
+class KafkaSystemProducerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+  val reconnects = newCounter("producer-reconnects")
+  val sends = newCounter("producer-sends")
+  val flushes = newCounter("flushes")
+  val flushSizes = newCounter("flush-sizes")
+
+  def setBufferSize(source: String, getValue: () => Int) {
+    newGauge("%s-producer-buffer-size" format source, getValue)
+  }
+
+  override def getPrefix = systemName + "-"
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala
deleted file mode 100644
index 75fc022..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.samza.system.kafka
-
-import org.apache.samza.metrics.{MetricsRegistry, Counter}
-import kafka.common.TopicAndPartition
-import java.util.concurrent.ConcurrentHashMap
-import grizzled.slf4j.Logging
-
-/**
- * Wrapper around the metrics that BrokerProxies will be updating per topic partition.  Multiple BrokerProxies will
- * be updating the map at the same time, but no two BrokerProxies should be updating the same key at the same time.
- *
- * @param metricsRegistry Registry to hook counters into.
- */
-private[kafka] class TopicAndPartitionMetrics(metricsRegistry:MetricsRegistry) extends Logging {
-  val metricsGroup = "KafkaSystem"
-
-  val counters = new ConcurrentHashMap[TopicAndPartition, (Counter,Counter,Counter)]()
-
-  def addNewTopicAndPartition(tp:TopicAndPartition) = {
-    if(containsTopicAndPartition(tp)) {
-      warn("TopicAndPartitionsMetrics already has an entry for topic-partition %s, not adding." format tp)
-    } else {
-      counters.put(tp, (newCounter("%s-OffsetChange" format tp),  newCounter("%s-BytesRead" format tp), newCounter("%s-Reads" format tp)))
-    }
-  }
-
-  def containsTopicAndPartition(tp:TopicAndPartition) = counters.containsKey(tp)
-
-  def newCounter = metricsRegistry.newCounter(metricsGroup, _:String)
-
-  def getOffsetCounter(tp:TopicAndPartition) = counters.get(tp)._1
-
-  def getBytesReadCounter(tp:TopicAndPartition) = counters.get(tp)._2
-
-  def getReadsCounter(tp:TopicAndPartition) = counters.get(tp)._3
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 947f5a7..85f5887 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -54,31 +54,24 @@ class TestBrokerProxy {
       def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2)
     }
 
+    val system = "daSystem"
     val config = new MapConfig(Map[String, String]("job.name" -> "Jobby McJob",
-      "systems.daSystem.Redbird.consumer.auto.offset.reset" -> "largest"))
-    val metricsRegistry = {
-      val registry = Mockito.mock(classOf[MetricsRegistry])
-      val gauge = Mockito.mock(classOf[Gauge[Long]])
-      when(gauge.getValue()).thenReturn(0l)
-      when(registry.newGauge[Long](anyString(), anyString(), anyLong())).thenReturn(gauge)
-
-      val counter = Mockito.mock(classOf[Counter])
-      when(registry.newCounter(anyString(), anyString())).thenReturn(counter)
-      registry
-    }
-
+      "systems.%s.Redbird.consumer.auto.offset.reset".format(system) -> "largest"))
+    val host = "host"
+    val port = 2222
     val tp = new TopicAndPartition("Redbird", 2012)
-    val tpMetrics = new TopicAndPartitionMetrics(metricsRegistry)
+    val metrics = new KafkaSystemConsumerMetrics(system)
 
-    tpMetrics.addNewTopicAndPartition(tp)
+    metrics.registerBrokerProxy(host, port)
+    metrics.registerTopicAndPartition(tp)
+    metrics.topicPartitions(host, port).set(1)
 
     val bp = new BrokerProxy(
-      "host",
-      2222,
-      "daSystem",
+      host,
+      port,
+      system,
       "daClientId",
-      metricsRegistry,
-      tpMetrics,
+      metrics,
       offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) {
 
       val messageSink: MessageSink = sink

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index a21e3bf..cd0942a 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -51,13 +51,13 @@ class TestKafkaSystemProducer {
     val props = getProps
     @volatile var msgsSent = new CountDownLatch(1)
 
-    val producer = new KafkaSystemProducer("test", 1, 100, new MetricsRegistryMap, () => {
+    val producer = new KafkaSystemProducer("test", 1, 100, () => {
       new Producer[Object, Object](new ProducerConfig(props)) {
         override def send(messages: KeyedMessage[Object, Object]*) {
           msgsSent.countDown
         }
       }
-    })
+    }, new KafkaSystemProducerMetrics)
 
     producer.register("test")
     producer.start
@@ -72,13 +72,13 @@ class TestKafkaSystemProducer {
     val props = getProps
     @volatile var msgsSent = 0
 
-    val producer = new KafkaSystemProducer("test", 2, 100, new MetricsRegistryMap, () => {
+    val producer = new KafkaSystemProducer("test", 2, 100, () => {
       new Producer[Object, Object](new ProducerConfig(props)) {
         override def send(messages: KeyedMessage[Object, Object]*) {
           msgsSent += 1
         }
       }
-    })
+    }, new KafkaSystemProducerMetrics)
 
     // second message should trigger the count down
     producer.register("test")
@@ -97,13 +97,13 @@ class TestKafkaSystemProducer {
     val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a")
     val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b")
 
-    val producer = new KafkaSystemProducer("test", 3, 100, new MetricsRegistryMap, () => {
+    val producer = new KafkaSystemProducer("test", 3, 100, () => {
       new Producer[Object, Object](new ProducerConfig(props)) {
         override def send(messages: KeyedMessage[Object, Object]*) {
           msgs ++= messages.map(_.message.asInstanceOf[String])
         }
       }
-    })
+    }, new KafkaSystemProducerMetrics)
 
     // flush should trigger the count down
     producer.register("test")
@@ -128,7 +128,7 @@ class TestKafkaSystemProducer {
     props.put("producer.type", "sync")
 
     var failCount = 0
-    val producer = new KafkaSystemProducer("test", 1, 100, new MetricsRegistryMap, () => {
+    val producer = new KafkaSystemProducer("test", 1, 100, () => {
       failCount += 1
       if (failCount <= 5) {
         throw new RuntimeException("Pretend to fail in factory")
@@ -144,7 +144,7 @@ class TestKafkaSystemProducer {
           }
         }
       }
-    })
+    }, new KafkaSystemProducerMetrics)
 
     producer.register("test")
     producer.start

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index bd479b6..6685f85 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -42,9 +42,11 @@ import grizzled.slf4j.Logging
  * @param cacheEntries The number of entries to hold in the in memory-cache
  * @param writeBatchSize The number of entries to batch together before forcing a write
  */
-class CachedStore[K, V](val store: KeyValueStore[K, V],
+class CachedStore[K, V](
+  val store: KeyValueStore[K, V],
   val cacheSize: Int,
-  val writeBatchSize: Int) extends KeyValueStore[K, V] with Logging {
+  val writeBatchSize: Int,
+  val metrics: CachedStoreMetrics = new CachedStoreMetrics) extends KeyValueStore[K, V] with Logging {
 
   /** the number of items in the dirty list */
   private var dirtyCount = 0
@@ -66,9 +68,15 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
     }
   }
 
+  metrics.setDirtyCount(() => dirtyCount)
+  metrics.setCacheSize(() => cache.size)
+
   def get(key: K) = {
+    metrics.gets.inc
+
     val c = cache.get(key)
     if (c != null) {
+      metrics.cacheHits.inc
       c.value
     } else {
       val v = store.get(key)
@@ -78,16 +86,20 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
   }
 
   def range(from: K, to: K) = {
+    metrics.ranges.inc
     flush()
     store.range(from, to)
   }
 
   def all() = {
+    metrics.alls.inc
     flush()
     store.all()
   }
 
   def put(key: K, value: V) {
+    metrics.puts.inc
+
     // add the key to the front of the dirty list (and remove any prior occurrences to dedupe)
     val found = cache.get(key)
     if (found == null || found.dirty == null) {
@@ -95,7 +107,7 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
     } else {
       // If we are removing the head of the list, move the head to the next 
       // element. See SAMZA-45 for details.
-      if(found.dirty.prev == null) {
+      if (found.dirty.prev == null) {
         this.dirty = found.dirty.next
         this.dirty.prev = null
       } else {
@@ -123,6 +135,8 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
   def flush() {
     trace("Flushing.")
 
+    metrics.flushes.inc
+
     // write out the contents of the dirty list oldest first
     val batch = new java.util.ArrayList[Entry[K, V]](this.dirtyCount)
     for (k <- this.dirty.reverse) {
@@ -132,6 +146,7 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
     }
     store.putAll(batch)
     store.flush
+    metrics.flushBatchSize.inc(batch.size)
 
     // reset the dirty list
     this.dirty = new mutable.DoubleLinkedList[K]()
@@ -153,6 +168,8 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
    * Perform the local delete and log it out to the changelog
    */
   def delete(key: K) {
+    metrics.deletes.inc
+
     put(key, null.asInstanceOf[V])
   }
 
@@ -160,10 +177,9 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
     trace("Closing.")
 
     flush
-    
+
     store.close
   }
-
 }
 
 private case class CacheEntry[K, V](var value: V, var dirty: mutable.DoubleLinkedList[K])

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
new file mode 100644
index 0000000..8ffcfb1
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.MetricsHelper
+
+class CachedStoreMetrics(
+  val storeName: String = "unknown",
+  val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+
+  val gets = newCounter("gets")
+  val ranges = newCounter("ranges")
+  val alls = newCounter("alls")
+  val cacheHits = newCounter("cache-hits")
+  val puts = newCounter("puts")
+  val deletes = newCounter("deletes")
+  val flushes = newCounter("flushes")
+  val flushBatchSize = newCounter("flush-batch-size")
+
+  def setDirtyCount(getValue: () => Int) {
+    newGauge("dirty-count", getValue)
+  }
+
+  def setCacheSize(getValue: () => Int) {
+    newGauge("cache-size", getValue)
+  }
+  
+  override def getPrefix = storeName + "-"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 92eee38..fc22383 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -37,17 +37,41 @@ import org.apache.samza.storage.StorageEngine
 class KeyValueStorageEngine[K, V](
   db: KeyValueStore[K, V],
   rawDb: KeyValueStore[Array[Byte], Array[Byte]],
+  metrics: KeyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics,
   batchSize: Int = 500) extends StorageEngine with KeyValueStore[K, V] with Logging {
 
   var count = 0
 
   /* delegate to underlying store */
-  def get(key: K): V = db.get(key)
-  def put(key: K, value: V) = db.put(key, value)
-  def putAll(entries: java.util.List[Entry[K, V]]) = db.putAll(entries)
-  def delete(key: K) = db.delete(key)
-  def range(from: K, to: K) = db.range(from, to)
-  def all() = db.all()
+  def get(key: K): V = {
+    metrics.gets.inc
+    db.get(key)
+  }
+
+  def put(key: K, value: V) = {
+    metrics.puts.inc
+    db.put(key, value)
+  }
+
+  def putAll(entries: java.util.List[Entry[K, V]]) = {
+    metrics.puts.inc(entries.size)
+    db.putAll(entries)
+  }
+
+  def delete(key: K) = {
+    metrics.deletes.inc
+    db.delete(key)
+  }
+
+  def range(from: K, to: K) = {
+    metrics.ranges.inc
+    db.range(from, to)
+  }
+
+  def all() = {
+    metrics.alls.inc
+    db.all()
+  }
 
   /**
    * Restore the contents of this key/value store from the change log,
@@ -57,13 +81,18 @@ class KeyValueStorageEngine[K, V](
     val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize)
 
     for (envelope <- envelopes) {
-      batch.add(new Entry(envelope.getKey.asInstanceOf[Array[Byte]], envelope.getMessage.asInstanceOf[Array[Byte]]))
+      val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]]
+      val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]]
+
+      batch.add(new Entry(keyBytes, valBytes))
 
       if (batch.size >= batchSize) {
         rawDb.putAll(batch)
         batch.clear()
       }
 
+      metrics.restoredBytes.inc(keyBytes.size + valBytes.size)
+      metrics.restoredMessages.inc
       count += 1
 
       if (count % 1000000 == 0) {
@@ -79,6 +108,8 @@ class KeyValueStorageEngine[K, V](
   def flush() = {
     trace("Flushing.")
 
+    metrics.flushes.inc
+
     db.flush
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
index dbdefa0..d42f46e 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
@@ -57,22 +57,27 @@ class KeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] {
       throw new SamzaException("Must define a message serde when using key value storage.")
     }
 
-    val levelDb = new LevelDbKeyValueStore(storeDir, LevelDbKeyValueStore.options(storageConfig))
+    val levelDbMetrics = new LevelDbKeyValueStoreMetrics(storeName, registry)
+    val levelDb = new LevelDbKeyValueStore(storeDir, LevelDbKeyValueStore.options(storageConfig), levelDbMetrics)
     val maybeLoggedStore = if (changeLogSystemStreamPartition == null) {
       levelDb
     } else {
-      new LoggedStore(levelDb, changeLogSystemStreamPartition, collector)
+      val loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry)
+      new LoggedStore(levelDb, changeLogSystemStreamPartition, collector, loggedStoreMetrics)
     }
-    val serialized = new SerializedKeyValueStore[K, V](maybeLoggedStore, keySerde, msgSerde)
+    val serializedMetrics = new SerializedKeyValueStoreMetrics(storeName, registry)
+    val serialized = new SerializedKeyValueStore[K, V](maybeLoggedStore, keySerde, msgSerde, serializedMetrics)
     val maybeCachedStore = if (enableCache) {
-      new CachedStore(serialized, cacheSize, batchSize)
+      val cachedStoreMetrics = new CachedStoreMetrics(storeName, registry)
+      new CachedStore(serialized, cacheSize, batchSize, cachedStoreMetrics)
     } else {
       serialized
     }
     val db = maybeCachedStore
+    val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry)
 
     // Decide if we should use raw bytes when restoring
 
-    new KeyValueStorageEngine(db, levelDb, batchSize)
+    new KeyValueStorageEngine(db, levelDb, keyValueStorageEngineMetrics, batchSize)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
new file mode 100644
index 0000000..a7958f6
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.MetricsHelper
+
+class KeyValueStorageEngineMetrics(
+  val storeName: String = "unknown",
+  val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+
+  val gets = newCounter("gets")
+  val ranges = newCounter("ranges")
+  val alls = newCounter("alls")
+  val puts = newCounter("puts")
+  val deletes = newCounter("deletes")
+  val flushes = newCounter("flushes")
+
+  val restoredMessages = newCounter("messages-restored")
+  val restoredBytes = newCounter("messages-bytes")
+
+  override def getPrefix = storeName + "-"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
index ac6da25..d37acf9 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
@@ -44,57 +44,76 @@ object LevelDbKeyValueStore {
 
 class LevelDbKeyValueStore(
   val dir: File,
-  val options: Options) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
+  val options: Options,
+  val metrics: LevelDbKeyValueStoreMetrics = new LevelDbKeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
 
   private lazy val db = factory.open(dir, options)
   private val lexicographic = new LexicographicComparator()
 
   def get(key: Array[Byte]): Array[Byte] = {
+    metrics.gets.inc
     require(key != null, "Null key not allowed.")
     val found = db.get(key)
-    if (found == null)
-      null
-    else
-      found
+    if (found != null) {
+      metrics.bytesRead.inc(found.size)
+    }
+    found
   }
 
   def put(key: Array[Byte], value: Array[Byte]) {
+    metrics.puts.inc
     require(key != null, "Null key not allowed.")
-    if (value == null)
+    if (value == null) {
       db.delete(key)
-    else
+    } else {
+      metrics.bytesWritten.inc(key.size + value.size)
       db.put(key, value)
+    }
   }
 
   def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
     val batch = db.createWriteBatch()
     val iter = entries.iterator
+    var wrote = 0
+    var deletes = 0
     while (iter.hasNext) {
+      wrote += 1
       val curr = iter.next()
-      if (curr.getValue == null)
+      if (curr.getValue == null) {
+        deletes += 1
         batch.delete(curr.getKey)
-      else
-        batch.put(curr.getKey, curr.getValue)
+      } else {
+        val key = curr.getKey
+        val value = curr.getValue
+        metrics.bytesWritten.inc(key.size + value.size)
+        batch.put(key, value)
+      }
     }
     db.write(batch)
+    metrics.puts.inc(wrote)
+    metrics.deletes.inc(deletes)
   }
 
   def delete(key: Array[Byte]) {
+    metrics.deletes.inc
     put(key, null)
   }
 
   def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
+    metrics.ranges.inc
     require(from != null && to != null, "Null bound not allowed.")
     new LevelDbRangeIterator(db.iterator, from, to)
   }
 
   def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+    metrics.alls.inc
     val iter = db.iterator()
     iter.seekToFirst()
     new LevelDbIterator(iter)
   }
 
   def flush {
+    metrics.flushes.inc
     // TODO can't find a flush for leveldb
     trace("Flushing, but flush in LevelDbKeyValueStore doesn't do anything.")
   }
@@ -114,8 +133,14 @@ class LevelDbKeyValueStore(
     def remove() = iter.remove()
     def hasNext() = iter.hasNext()
     def next() = {
-      val curr = iter.next()
-      new Entry(curr.getKey, curr.getValue)
+      val curr = iter.next
+      val key = curr.getKey
+      val value = curr.getValue
+      metrics.bytesRead.inc(key.size)
+      if (value != null) {
+        metrics.bytesRead.inc(value.size)
+      }
+      new Entry(key, value)
     }
     override def finalize() {
       if (open) {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala
new file mode 100644
index 0000000..8949f2f
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.MetricsHelper
+
+class LevelDbKeyValueStoreMetrics(
+  val storeName: String = "unknown",
+  val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+
+  val gets = newCounter("gets")
+  val ranges = newCounter("ranges")
+  val alls = newCounter("alls")
+  val puts = newCounter("puts")
+  val deletes = newCounter("deletes")
+  val flushes = newCounter("flushes")
+  val bytesWritten = newCounter("bytes-written")
+  val bytesRead = newCounter("bytes-read")
+
+  override def getPrefix = storeName + "-"
+}
\ No newline at end of file