You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/09/20 23:52:43 UTC
[2/2] git commit: SAMZA-25; adding metrics to everything.
SAMZA-25; adding metrics to everything.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/bb0abb67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/bb0abb67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/bb0abb67
Branch: refs/heads/master
Commit: bb0abb670892806d46c64d158cabae8990aceb09
Parents: 819b996
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Fri Sep 20 14:52:21 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Fri Sep 20 14:52:21 2013 -0700
----------------------------------------------------------------------
build.gradle | 1 +
.../apache/samza/metrics/MetricsRegistry.java | 4 ++
.../apache/samza/util/NoOpMetricsRegistry.java | 10 ++++
.../apache/samza/container/SamzaContainer.scala | 32 ++++++++--
.../samza/container/SamzaContainerMetrics.scala | 15 +++--
.../apache/samza/container/TaskInstance.scala | 20 +++++++
.../samza/container/TaskInstanceMetrics.scala | 21 +++++--
.../org/apache/samza/metrics/JvmMetrics.scala | 32 +++++-----
.../apache/samza/metrics/MetricsHelper.scala | 50 ++++++++++++++++
.../samza/metrics/MetricsRegistryMap.scala | 33 ++++++++---
.../apache/samza/system/SystemConsumers.scala | 25 +++++++-
.../samza/system/SystemConsumersMetrics.scala | 61 +++++++++++++++++++
.../apache/samza/system/SystemProducers.scala | 24 +++++++-
.../samza/system/SystemProducersMetrics.scala | 37 ++++++++++++
.../samza/metrics/TestMetricsHelper.scala | 31 ++++++++++
.../metrics/reporter/TestJmxReporter.scala | 4 +-
.../apache/samza/system/kafka/BrokerProxy.scala | 24 ++++----
.../samza/system/kafka/BrokerProxyMetrics.scala | 52 ----------------
.../system/kafka/KafkaSystemConsumer.scala | 10 ++--
.../kafka/KafkaSystemConsumerMetrics.scala | 62 ++++++++++++++++++++
.../samza/system/kafka/KafkaSystemFactory.scala | 8 ++-
.../system/kafka/KafkaSystemProducer.scala | 21 ++++---
.../kafka/KafkaSystemProducerMetrics.scala | 41 +++++++++++++
.../system/kafka/TopicAndPartitionMetrics.scala | 56 ------------------
.../samza/system/kafka/TestBrokerProxy.scala | 31 ++++------
.../system/kafka/TestKafkaSystemProducer.scala | 16 ++---
.../apache/samza/storage/kv/CachedStore.scala | 26 ++++++--
.../samza/storage/kv/CachedStoreMetrics.scala | 49 ++++++++++++++++
.../storage/kv/KeyValueStorageEngine.scala | 45 +++++++++++---
.../kv/KeyValueStorageEngineFactory.scala | 15 +++--
.../kv/KeyValueStorageEngineMetrics.scala | 42 +++++++++++++
.../samza/storage/kv/LevelDbKeyValueStore.scala | 49 ++++++++++++----
.../kv/LevelDbKeyValueStoreMetrics.scala | 41 +++++++++++++
.../apache/samza/storage/kv/LoggedStore.scala | 48 ++++++++++-----
.../samza/storage/kv/LoggedStoreMetrics.scala | 39 ++++++++++++
.../storage/kv/SerializedKeyValueStore.scala | 38 ++++++++++--
.../kv/SerializedKeyValueStoreMetrics.scala | 41 +++++++++++++
.../samza/job/yarn/SamzaAppMasterMetrics.scala | 32 +++++-----
38 files changed, 908 insertions(+), 278 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4c4e99c..077e44b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -154,6 +154,7 @@ project(":samza-kv_$scalaVersion") {
dependencies {
compile project(':samza-api')
+ compile project(":samza-core_$scalaVersion")
compile "org.scala-lang:scala-library:$scalaVersion"
compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion"
compile "org.fusesource.leveldbjni:leveldbjni-all:$leveldbVersion"
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
index a4563c4..9df1ef6 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
@@ -22,5 +22,9 @@ package org.apache.samza.metrics;
public interface MetricsRegistry {
Counter newCounter(String group, String name);
+ Counter newCounter(String group, Counter counter);
+
<T> Gauge<T> newGauge(String group, String name, T value);
+
+ <T> Gauge<T> newGauge(String group, Gauge<T> value);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
index c071c47..8bc0764 100644
--- a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
+++ b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
@@ -30,7 +30,17 @@ public class NoOpMetricsRegistry implements MetricsRegistry {
}
@Override
+ public Counter newCounter(String group, Counter counter) {
+ return counter;
+ }
+
+ @Override
public <T> Gauge<T> newGauge(String group, String name, T value) {
return new Gauge<T>(name, value);
}
+
+ @Override
+ public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
+ return gauge;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 2d2efdd..62bd243 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -55,6 +55,9 @@ import org.apache.samza.system.SystemProducers
import org.apache.samza.task.ReadableCollector
import org.apache.samza.system.DefaultChooser
import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemProducersMetrics
+import org.apache.samza.system.SystemConsumersMetrics
+import org.apache.samza.metrics.MetricsRegistryMap
object SamzaContainer extends Logging {
def main(args: Array[String]) {
@@ -83,7 +86,10 @@ object SamzaContainer extends Logging {
info("Using partitions: %s" format partitions)
info("Using configuration: %s" format config)
- val samzaContainerMetrics = new SamzaContainerMetrics(containerName)
+ val registry = new MetricsRegistryMap(containerName)
+ val samzaContainerMetrics = new SamzaContainerMetrics(containerName, registry)
+ val systemProducersMetrics = new SystemProducersMetrics(registry)
+ val systemConsumersMetrics = new SystemConsumersMetrics(registry)
val inputStreams = config.getInputStreams
val inputSystems = inputStreams.map(_.getSystem)
@@ -261,11 +267,13 @@ object SamzaContainer extends Logging {
// TODO add config values for no new message timeout and max msgs per stream partition
chooser = chooser,
consumers = consumers,
- serdeManager = serdeManager)
+ serdeManager = serdeManager,
+ metrics = systemConsumersMetrics)
val producerMultiplexer = new SystemProducers(
producers = producers,
- serdeManager = serdeManager)
+ serdeManager = serdeManager,
+ metrics = systemProducersMetrics)
val listeners = config.getLifecycleListeners match {
case Some(listeners) => {
@@ -324,7 +332,7 @@ object SamzaContainer extends Logging {
val collector = new ReadableCollector
- val taskInstanceMetrics = new TaskInstanceMetrics(partition)
+ val taskInstanceMetrics = new TaskInstanceMetrics("Partition-%s" format partition.getPartitionId)
val storeConsumers = changeLogSystemStreams
.map {
@@ -402,8 +410,8 @@ object SamzaContainer extends Logging {
config = config,
consumerMultiplexer = consumerMultiplexer,
producerMultiplexer = producerMultiplexer,
- checkpointManager = checkpointManager,
metrics = samzaContainerMetrics,
+ checkpointManager = checkpointManager,
reporters = reporters,
jvm = jvm)
}
@@ -414,8 +422,8 @@ class SamzaContainer(
config: Config,
consumerMultiplexer: SystemConsumers,
producerMultiplexer: SystemProducers,
+ metrics: SamzaContainerMetrics,
checkpointManager: CheckpointManager = null,
- metrics: SamzaContainerMetrics = new SamzaContainerMetrics,
reporters: Map[String, MetricsReporter] = Map(),
jvm: JvmMetrics = null) extends Runnable with Logging {
@@ -530,6 +538,8 @@ class SamzaContainer(
def process(coordinator: ReadableCoordinator) {
trace("Attempting to choose a message to process.")
+ metrics.processes.inc
+
val envelope = consumerMultiplexer.choose
if (envelope != null) {
@@ -537,27 +547,37 @@ class SamzaContainer(
trace("Processing incoming message envelope for partition %s." format partition)
+ metrics.envelopes.inc
+
taskInstances(partition).process(envelope, coordinator)
} else {
trace("No incoming message envelope was available.")
+
+ metrics.nullEnvelopes.inc
}
}
def window(coordinator: ReadableCoordinator) {
trace("Windowing stream tasks.")
+ metrics.windows.inc
+
taskInstances.values.foreach(_.window(coordinator))
}
def send {
trace("Triggering send in task instances.")
+ metrics.sends.inc
+
taskInstances.values.foreach(_.send)
}
def commit(coordinator: ReadableCoordinator) {
trace("Committing task instances.")
+ metrics.commits.inc
+
taskInstances.values.foreach(_.commit(coordinator))
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 81cf356..bcb3fa3 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -21,13 +21,16 @@ package org.apache.samza.container
import org.apache.samza.metrics.ReadableMetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.MetricsHelper
class SamzaContainerMetrics(
- val containerName: String = "unnamed-container",
- val registry: ReadableMetricsRegistry = new MetricsRegistryMap) {
+ val source: String = "unknown",
+ val registry: ReadableMetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
- val source = containerName
- val commits = registry.newCounter("samza.task.SamzaContainer", "commits")
-
- // TODO .. etc ..
+ val commits = newCounter("commit-calls")
+ val windows = newCounter("window-calls")
+ val processes = newCounter("process-calls")
+ val sends = newCounter("send-calls")
+ val envelopes = newCounter("process-envelopes")
+ val nullEnvelopes = newCounter("process-null-envelopes")
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index c61994d..d7eff62 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -42,6 +42,7 @@ import org.apache.samza.task.ReadableCollector
import org.apache.samza.system.SystemConsumers
import org.apache.samza.system.SystemProducers
import org.apache.samza.task.ReadableCoordinator
+import org.apache.samza.metrics.Gauge
class TaskInstance(
task: StreamTask,
@@ -136,6 +137,8 @@ class TaskInstance(
for ((systemStream, offset) <- checkpoint.getOffsets) {
if (!resetInputStreams.getOrElse(systemStream, false)) {
offsets += systemStream -> offset
+
+ metrics.addOffsetGauge(systemStream, () => offsets(systemStream))
} else {
info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStream))
}
@@ -156,6 +159,8 @@ class TaskInstance(
}
def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) {
+ metrics.processes.inc
+
listeners.foreach(_.beforeProcess(envelope, config, context))
trace("Processing incoming message envelope for partition: %s, %s" format (partition, envelope.getSystemStreamPartition))
@@ -173,12 +178,16 @@ class TaskInstance(
if (isWindowableTask && windowMs >= 0 && lastWindowMs + windowMs < clock()) {
trace("Windowing for partition: %s" format partition)
+ metrics.windows.inc
+
task.asInstanceOf[WindowableTask].window(collector, coordinator)
lastWindowMs = clock()
trace("Assigned last window time for partition: %s, %s" format (partition, lastWindowMs))
} else {
trace("Skipping window for partition: %s" format partition)
+
+ metrics.windowsSkipped.inc
}
}
@@ -186,6 +195,9 @@ class TaskInstance(
if (collector.envelopes.size > 0) {
trace("Sending messages for partition: %s, %s" format (partition, collector.envelopes.size))
+ metrics.sends.inc
+ metrics.messagesSent.inc(collector.envelopes.size)
+
collector.envelopes.foreach(envelope => producerMultiplexer.send(metrics.source, envelope))
trace("Resetting collector for partition: %s" format partition)
@@ -193,6 +205,8 @@ class TaskInstance(
collector.reset
} else {
trace("Skipping send for partition %s because no messages were collected." format partition)
+
+ metrics.sendsSkipped.inc
}
}
@@ -200,6 +214,8 @@ class TaskInstance(
if (lastCommitMs + commitMs < clock() || coordinator.isCommitRequested || coordinator.isShutdownRequested) {
trace("Flushing state stores for partition: %s" format partition)
+ metrics.commits.inc
+
storageManager.flush
trace("Flushing producers for partition: %s" format partition)
@@ -213,6 +229,10 @@ class TaskInstance(
}
lastCommitMs = clock()
+ } else {
+ trace("Skipping commit for partition: %s" format partition)
+
+ metrics.commitsSkipped.inc
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
index 07d72c7..46f1f17 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -22,13 +22,24 @@ package org.apache.samza.container
import org.apache.samza.metrics.ReadableMetricsRegistry
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.Partition
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.system.SystemStream
+import org.apache.samza.metrics.Gauge
class TaskInstanceMetrics(
- val partition: Partition,
- val registry: ReadableMetricsRegistry = new MetricsRegistryMap) {
+ val source: String = "unknown",
+ val registry: ReadableMetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
- val source = "Partition-%s" format partition.getPartitionId
- val commits = registry.newCounter("samza.task.TaskInstance", "commits")
+ val commits = newCounter("commit-calls")
+ val commitsSkipped = newCounter("commit-skipped")
+ val windows = newCounter("window-calls")
+ val windowsSkipped = newCounter("window-skipped")
+ val processes = newCounter("process-calls")
+ val sends = newCounter("send-calls")
+ val sendsSkipped = newCounter("send-skipped")
+ val messagesSent = newCounter("messages-sent")
- // TODO .. etc ..
+ def addOffsetGauge(systemStream: SystemStream, getValue: () => String) {
+ newGauge("%s-%s-offset" format (systemStream.getSystem, systemStream.getStream), getValue)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
index 164a2ee..301a5a0 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
@@ -32,10 +32,8 @@ import org.apache.samza.util.DaemonThreadFactory
/**
* Straight up ripoff of Hadoop's metrics2 JvmMetrics class.
*/
-class JvmMetrics(group: String, registry: MetricsRegistry) extends Runnable with Logging {
- final val M = 1024 * 1024.0f
-
- def this(registry: MetricsRegistry) = this("samza.jvm", registry)
+class JvmMetrics(val registry: MetricsRegistry) extends MetricsHelper with Runnable with Logging {
+ final val M = 1024 * 1024.0F
val memoryMXBean = ManagementFactory.getMemoryMXBean()
val gcBeans = ManagementFactory.getGarbageCollectorMXBeans()
@@ -44,18 +42,18 @@ class JvmMetrics(group: String, registry: MetricsRegistry) extends Runnable with
val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory)
// jvm metrics
- val gMemNonHeapUsedM = registry.newGauge[Float](group, "MemNonHeapUsedM", 0)
- val gMemNonHeapCommittedM = registry.newGauge[Float](group, "MemNonHeapCommittedM", 0)
- val gMemHeapUsedM = registry.newGauge[Float](group, "MemHeapUsedM", 0)
- val gMemHeapCommittedM = registry.newGauge[Float](group, "MemHeapCommittedM", 0)
- val gThreadsNew = registry.newGauge[Long](group, "ThreadsNew", 0)
- val gThreadsRunnable = registry.newGauge[Long](group, "ThreadsRunnable", 0)
- val gThreadsBlocked = registry.newGauge[Long](group, "ThreadsBlocked", 0)
- val gThreadsWaiting = registry.newGauge[Long](group, "ThreadsWaiting", 0)
- val gThreadsTimedWaiting = registry.newGauge[Long](group, "ThreadsTimedWaiting", 0)
- val gThreadsTerminated = registry.newGauge[Long](group, "ThreadsTerminated", 0)
- val cGcCount = registry.newCounter(group, "GcCount")
- val cGcTimeMillis = registry.newCounter(group, "GcTimeMillis")
+ val gMemNonHeapUsedM = newGauge("mem-non-heap-used-mb", 0.0F)
+ val gMemNonHeapCommittedM = newGauge("mem-non-heap-committed-mb", 0.0F)
+ val gMemHeapUsedM = newGauge("mem-heap-used-mb", 0.0F)
+ val gMemHeapCommittedM = newGauge("mem-heap-committed-mb", 0.0F)
+ val gThreadsNew = newGauge("threads-new", 0L)
+ val gThreadsRunnable = newGauge("threads-runnable", 0L)
+ val gThreadsBlocked = newGauge("threads-blocked", 0L)
+ val gThreadsWaiting = newGauge("threads-waiting", 0L)
+ val gThreadsTimedWaiting = newGauge("threads-timed-waiting", 0L)
+ val gThreadsTerminated = newGauge("threads-terminated", 0L)
+ val cGcCount = newCounter("gc-count")
+ val cGcTimeMillis = newCounter("gc-time-millis")
def start {
executor.scheduleWithFixedDelay(this, 0, 5, TimeUnit.SECONDS)
@@ -107,7 +105,7 @@ class JvmMetrics(group: String, registry: MetricsRegistry) extends Runnable with
gcBeanCounters.get(gcName) match {
case Some(gcBeanCounterTuple) => gcBeanCounterTuple
case _ => {
- val t = (registry.newCounter(group, "GcCount" + gcName), registry.newCounter(group, "GcTimeMillis" + gcName))
+ val t = (newCounter("%s-gc-count" format gcName), newCounter("%s-gc-time-millis" format gcName))
gcBeanCounters += (gcName -> t)
t
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
new file mode 100644
index 0000000..b412e46
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+
+import org.apache.samza.container.SamzaContainerMetrics
+
+/**
+ * MetricsHelper is a little helper class to make it easy to register and
+ * manage counters and gauges.
+ */
+trait MetricsHelper {
+ val group = this.getClass.getName
+ val registry: MetricsRegistry
+
+ def newCounter(name: String) = {
+ registry.newCounter(group, (getPrefix + name).toLowerCase)
+ }
+
+ def newGauge[T](name: String, value: T) = {
+ registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value))
+ }
+
+ def newGauge[T](name: String, value: () => T) = {
+ registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value()) {
+ override def getValue = value()
+ })
+ }
+
+ /**
+ * Returns a prefix for metric names.
+ */
+ def getPrefix = ""
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
index fc0bd38..da83f20 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
@@ -18,6 +18,7 @@
*/
package org.apache.samza.metrics
+
import grizzled.slf4j.Logging
import java.util.concurrent.ConcurrentHashMap
@@ -25,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap
* A class that holds all metrics registered with it. It can be registered
* with one or more MetricReporters to flush metrics.
*/
-class MetricsRegistryMap extends ReadableMetricsRegistry with Logging {
+class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with Logging {
var listeners = Set[ReadableMetricsRegistryListener]()
/*
@@ -33,20 +34,32 @@ class MetricsRegistryMap extends ReadableMetricsRegistry with Logging {
*/
val metrics = new ConcurrentHashMap[String, ConcurrentHashMap[String, Metric]]
+ def this() = this("unknown")
+
+ def newCounter(group: String, counter: Counter) = {
+ debug("Add new counter %s %s %s." format (group, counter.getName, counter))
+ putAndGetGroup(group).putIfAbsent(counter.getName, counter)
+ val realCounter = metrics.get(group).get(counter.getName).asInstanceOf[Counter]
+ listeners.foreach(_.onCounter(group, realCounter))
+ realCounter
+ }
+
def newCounter(group: String, name: String) = {
debug("Creating new counter %s %s." format (group, name))
- putAndGetGroup(group).putIfAbsent(name, new Counter(name))
- val counter = metrics.get(group).get(name).asInstanceOf[Counter]
- listeners.foreach(_.onCounter(group, counter))
- counter
+ newCounter(group, new Counter(name))
+ }
+
+ def newGauge[T](group: String, gauge: Gauge[T]) = {
+ debug("Adding new gauge %s %s %s." format (group, gauge.getName, gauge))
+ putAndGetGroup(group).putIfAbsent(gauge.getName, gauge)
+ val realGauge = metrics.get(group).get(gauge.getName).asInstanceOf[Gauge[T]]
+ listeners.foreach(_.onGauge(group, realGauge))
+ realGauge
}
def newGauge[T](group: String, name: String, value: T) = {
debug("Creating new gauge %s %s %s." format (group, name, value))
- putAndGetGroup(group).putIfAbsent(name, new Gauge[T](name, value))
- val gauge = metrics.get(group).get(name).asInstanceOf[Gauge[T]]
- listeners.foreach(_.onGauge(group, gauge))
- gauge
+ newGauge(group, new Gauge[T](name, value))
}
private def putAndGetGroup(group: String) = {
@@ -54,6 +67,8 @@ class MetricsRegistryMap extends ReadableMetricsRegistry with Logging {
metrics.get(group)
}
+ def getName = name
+
def getGroups = metrics.keySet()
def getGroup(group: String) = metrics.get(group)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index b18f0cc..d24671e 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -21,18 +21,19 @@ package org.apache.samza.system
import scala.collection.JavaConversions._
import scala.collection.mutable.Queue
-import grizzled.slf4j.Logging
+
import org.apache.samza.serializers.SerdeManager
+import grizzled.slf4j.Logging
+
class SystemConsumers(
chooser: MessageChooser,
consumers: Map[String, SystemConsumer],
serdeManager: SerdeManager,
+ metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
maxMsgsPerStreamPartition: Int = 1000,
noNewMessagesTimeout: Long = 10) extends Logging {
- // TODO add metrics
-
var unprocessedMessages = Map[SystemStreamPartition, Queue[IncomingMessageEnvelope]]()
var neededByChooser = Set[SystemStreamPartition]()
var fetchMap = Map[SystemStreamPartition, java.lang.Integer]()
@@ -42,6 +43,12 @@ class SystemConsumers(
debug("Got max messages per stream: %s" format maxMsgsPerStreamPartition)
debug("Got no new message timeout: %s" format noNewMessagesTimeout)
+ metrics.setUnprocessedMessages(() => fetchMap.values.map(maxMsgsPerStreamPartition - _.intValue).sum)
+ metrics.setNeededByChooser(() => neededByChooser.size)
+ metrics.setTimeout(() => timeout)
+ metrics.setMaxMessagesPerStreamPartition(() => maxMsgsPerStreamPartition)
+ metrics.setNoNewMessagesTimeout(() => noNewMessagesTimeout)
+
def start {
debug("Starting consumers.")
@@ -61,6 +68,8 @@ class SystemConsumers(
fetchMap += systemStreamPartition -> maxMsgsPerStreamPartition
unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]()
consumers(systemStreamPartition.getSystem).register(systemStreamPartition, lastReadOffset)
+
+ metrics.registerSystem(systemStreamPartition.getSystem)
}
def choose = {
@@ -69,11 +78,15 @@ class SystemConsumers(
if (envelopeFromChooser == null) {
debug("Chooser returned null.")
+ metrics.choseNull.inc
+
// Allow blocking if the chooser didn't choose a message.
timeout = noNewMessagesTimeout
} else {
debug("Chooser returned an incoming message envelope: %s" format envelopeFromChooser)
+ metrics.choseObject.inc
+
// Don't block if we have a message to process.
timeout = 0
@@ -104,6 +117,8 @@ class SystemConsumers(
private def poll(systemName: String) = {
debug("Polling system consumer: %s" format systemName)
+ metrics.systemPolls(systemName).inc
+
val consumer = consumers(systemName)
debug("Filtering for system: %s, %s" format (systemName, fetchMap))
@@ -112,10 +127,14 @@ class SystemConsumers(
debug("Fetching: %s" format systemFetchMap)
+ metrics.systemStreamPartitionFetchesPerPoll(systemName).inc(systemFetchMap.size)
+
val incomingEnvelopes = consumer.poll(systemFetchMap, timeout)
debug("Got incoming message envelopes: %s" format incomingEnvelopes)
+ metrics.systemMessagesPerPoll(systemName).inc
+
// We have new un-processed envelopes, so update maps accordingly.
incomingEnvelopes.foreach(envelope => {
val systemStreamPartition = envelope.getSystemStreamPartition
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
new file mode 100644
index 0000000..9b3160d
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.MetricsHelper
+
+class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+ val choseNull = newCounter("chose-null")
+ val choseObject = newCounter("chose-object")
+ val systemPolls = scala.collection.mutable.Map[String, Counter]()
+ val systemStreamPartitionFetchesPerPoll = scala.collection.mutable.Map[String, Counter]()
+ val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]()
+
+ def setUnprocessedMessages(getValue: () => Int) {
+ newGauge("unprocessed-messages", getValue)
+ }
+
+ def setNeededByChooser(getValue: () => Int) {
+ newGauge("ssps-needed-by-chooser", getValue)
+ }
+
+ def setTimeout(getValue: () => Long) {
+ newGauge("poll-timeout", getValue)
+ }
+
+ def setMaxMessagesPerStreamPartition(getValue: () => Int) {
+ newGauge("max-buffered-messages-per-stream-partition", getValue)
+ }
+
+ def setNoNewMessagesTimeout(getValue: () => Long) {
+ newGauge("blocking-poll-timeout", getValue)
+ }
+
+ def registerSystem(systemName: String) {
+ if (!systemPolls.contains(systemName)) {
+ systemPolls += systemName -> newCounter("%s-polls" format systemName)
+ systemStreamPartitionFetchesPerPoll += systemName -> newCounter("%s-ssp-fetches-per-poll" format systemName)
+ systemMessagesPerPoll += systemName -> newCounter("%s-messages-per-poll" format systemName)
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
index 099c0bf..8fb36b3 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala
@@ -20,30 +20,48 @@
package org.apache.samza.system
import org.apache.samza.serializers.SerdeManager
+import grizzled.slf4j.Logging
class SystemProducers(
producers: Map[String, SystemProducer],
- serdeManager: SerdeManager) {
-
- // TODO add metrics and logging
+ serdeManager: SerdeManager,
+ metrics: SystemProducersMetrics = new SystemProducersMetrics) extends Logging {
def start {
+ debug("Starting producers.")
+
producers.values.foreach(_.start)
}
def stop {
+ debug("Stopping producers.")
+
producers.values.foreach(_.stop)
}
def register(source: String) {
+ debug("Registering source: %s" format source)
+
+ metrics.registerSource(source)
+
producers.values.foreach(_.register(source))
}
def flush(source: String) {
+ debug("Flushing source: %s" format source)
+
+ metrics.flushes.inc
+ metrics.sourceFlushes(source).inc
+
producers.values.foreach(_.flush(source))
}
def send(source: String, envelope: OutgoingMessageEnvelope) {
+ trace("Sending message from source: %s, %s" format (envelope, source))
+
+ metrics.sends.inc
+ metrics.sourceSends(source).inc
+
producers(envelope.getSystemStream.getSystem).send(source, serdeManager.toBytes(envelope))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
new file mode 100644
index 0000000..594dd51
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducersMetrics.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.Counter
+
+class SystemProducersMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+ val flushes = newCounter("flushes")
+ val sends = newCounter("sends")
+ val sourceFlushes = scala.collection.mutable.Map[String, Counter]()
+ val sourceSends = scala.collection.mutable.Map[String, Counter]()
+
+ def registerSource(source: String) {
+ sourceFlushes += source -> newCounter("%s-flushes" format source)
+ sourceSends += source -> newCounter("%s-sends" format source)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
new file mode 100644
index 0000000..0a62bd0
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics
+
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.samza.container.SamzaContainerMetrics
+
+class TestMetricsHelper {
+ @Test
+ def testMetricsHelperGroupShouldBePAckageName {
+ assertEquals(classOf[SamzaContainerMetrics].getName, new SamzaContainerMetrics().group)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
index 8827697..357b290 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
@@ -64,7 +64,7 @@ class TestJmxReporter {
@Test
def testJmxReporter {
val registry = new MetricsRegistryMap
- val jvm = new JvmMetrics("test", registry)
+ val jvm = new JvmMetrics(registry)
val reporter = new JmxReporterFactory().getMetricsReporter("", "", new MapConfig(Map[String, String]()))
reporter.register("test", registry)
@@ -72,7 +72,7 @@ class TestJmxReporter {
jvm.run
val mbserver = JMXConnectorFactory.connect(url).getMBeanServerConnection
- val stateViaJMX = mbserver.getAttribute(new ObjectName("test:type=test,name=MemNonHeapUsedM"), "Value").asInstanceOf[Float]
+ val stateViaJMX = mbserver.getAttribute(new ObjectName("org.apache.samza.metrics.JvmMetrics:type=test,name=mem-non-heap-used-mb"), "Value").asInstanceOf[Float]
assertTrue(stateViaJMX > 0)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 214de92..f4f616e 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -45,11 +45,10 @@ abstract class BrokerProxy(
val port: Int,
val system: String,
val clientID: String,
- val metricsRegistry: MetricsRegistry,
- tpMetrics: TopicAndPartitionMetrics,
+ val metrics: KafkaSystemConsumerMetrics,
val timeout: Int = Int.MaxValue,
val bufferSize: Int = 1024000,
- offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging with BrokerProxyMetrics {
+ offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging {
val messageSink: MessageSink
@@ -70,6 +69,8 @@ abstract class BrokerProxy(
var simpleConsumer = createSimpleConsumer()
+ metrics.registerBrokerProxy(host, port)
+
def createSimpleConsumer() = {
val hostString = "%s:%d" format (host, port)
info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system))
@@ -88,13 +89,13 @@ abstract class BrokerProxy(
val offset = offsetGetter.getNextOffset(simpleConsumer, tp, lastCheckpointedOffset)
nextOffsets += tp -> offset
- tpGaugeInc
+ metrics.topicPartitions(host, port).set(nextOffsets.size)
}
def removeTopicPartition(tp: TopicAndPartition) = {
if (nextOffsets.containsKey(tp)) {
nextOffsets.remove(tp)
- tpGaugeDec
+ metrics.topicPartitions(host, port).set(nextOffsets.size)
debug("Removed %s" format tp)
} else {
warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys().mkString(",")))
@@ -125,7 +126,7 @@ abstract class BrokerProxy(
debug("Stack trace for fetchMessages exception.", e)
simpleConsumer.close()
simpleConsumer = createSimpleConsumer()
- reconnectCounter.inc
+ metrics.reconnects(host, port).inc
}
}
}
@@ -135,6 +136,7 @@ abstract class BrokerProxy(
}, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
private def fetchMessages(): Unit = {
+ metrics.brokerReads(host, port).inc
val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList: _*)
firstCall = false
firstCallBarrier.countDown()
@@ -177,9 +179,11 @@ abstract class BrokerProxy(
nextOffset = message.nextOffset
- tpMetrics.getReadsCounter(tp).inc
- tpMetrics.getBytesReadCounter(tp).inc(message.message.payloadSize + message.message.keySize)
- tpMetrics.getOffsetCounter(tp).set(nextOffset)
+ val bytesSize = message.message.payloadSize + message.message.keySize
+ metrics.reads(tp).inc
+ metrics.bytesRead(tp).inc(bytesSize)
+ metrics.brokerBytesRead(host, port).inc(bytesSize)
+ metrics.offsets(tp).set(nextOffset)
}
nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching.
@@ -187,7 +191,7 @@ abstract class BrokerProxy(
// Update high water mark
val hw = data.hw
if (hw >= 0) {
- getLagGauge(tp).set(hw - nextOffset)
+ metrics.lag(tp).set(hw - nextOffset)
} else {
debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp))
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala
deleted file mode 100644
index bdd91da..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka
-
-import org.apache.samza.metrics.{Gauge, Counter, MetricsRegistry}
-import collection.mutable
-import kafka.common.TopicAndPartition
-
-private[kafka] trait BrokerProxyMetrics {
- self:BrokerProxy =>
- // TODO: Move topic-partition specific metrics out of brokerproxy, into system
- val metricsRegistry:MetricsRegistry
-
- def newCounter = metricsRegistry.newCounter(metricsGroup, _:String)
-
- val hostPort = host + ":" + port
- val metricsGroup = "samza.kafka.brokerproxy"
-
- // Counters
- val reconnectCounter = newCounter("%s-Reconnects" format hostPort)
-
-
-
- // Gauges
- val lagGauges = mutable.Map[TopicAndPartition, Gauge[Long]]()
- def getLagGauge(tp:TopicAndPartition) = lagGauges.getOrElseUpdate(tp, metricsRegistry.newGauge[Long](metricsGroup, "%s-MessagesBehindHighWaterMark" format tp, 0l))
-
- val tpGauge = metricsRegistry.newGauge[Long](metricsGroup, "%s-NumberOfTopicsPartitions" format hostPort, 0)
-
- def tpGaugeInc = tpGauge.set(tpGauge.getValue + 1l)
-
- def tpGaugeDec = tpGauge.set(tpGauge.getValue - 1l)
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 7970ffc..b8e17ce 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -54,7 +54,7 @@ object KafkaSystemConsumer {
private[kafka] class KafkaSystemConsumer(
systemName: String,
brokerListString: String,
- metricsRegistry: MetricsRegistry,
+ metrics: KafkaSystemConsumerMetrics,
clientId: String = "undefined-client-id-" + UUID.randomUUID.toString,
timeout: Int = Int.MaxValue,
bufferSize: Int = 1024000,
@@ -63,20 +63,18 @@ private[kafka] class KafkaSystemConsumer(
offsetGetter: GetOffset = new GetOffset("fail"),
deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
- clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metricsRegistry, new Clock {
+ clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metrics.registry, new Clock {
def currentTimeMillis = clock()
}) with Toss with Logging {
type HostPort = (String, Int)
val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
var lastReadOffsets = Map[SystemStreamPartition, String]()
- val topicAndPartitionMetrics = new TopicAndPartitionMetrics(metricsRegistry)
def start() {
val topicPartitionsAndOffsets = lastReadOffsets.map {
case (systemStreamPartition, offset) =>
val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
- topicAndPartitionMetrics.addNewTopicAndPartition(topicAndPartition)
(topicAndPartition, offset)
}
@@ -89,6 +87,8 @@ private[kafka] class KafkaSystemConsumer(
super.register(systemStreamPartition, lastReadOffset)
lastReadOffsets += systemStreamPartition -> lastReadOffset
+
+ metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition))
}
def stop() {
@@ -122,7 +122,7 @@ private[kafka] class KafkaSystemConsumer(
brokerOption match {
case Some(broker) =>
- val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, metricsRegistry, topicAndPartitionMetrics, timeout, bufferSize, offsetGetter) {
+ val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, timeout, bufferSize, offsetGetter) {
val messageSink: MessageSink = sink
})
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
new file mode 100644
index 0000000..1012d58
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.MetricsRegistry
+import java.util.concurrent.ConcurrentHashMap
+import kafka.common.TopicAndPartition
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.Gauge
+
+class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+ val offsets = new ConcurrentHashMap[TopicAndPartition, Counter]
+ val bytesRead = new ConcurrentHashMap[TopicAndPartition, Counter]
+ val reads = new ConcurrentHashMap[TopicAndPartition, Counter]
+ val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
+
+ /*
+ * (String, Int) = (host, port) of BrokerProxy.
+ */
+
+ val reconnects = new ConcurrentHashMap[(String, Int), Counter]
+ val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter]
+ val brokerReads = new ConcurrentHashMap[(String, Int), Counter]
+ val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]]
+
+ def registerTopicAndPartition(tp: TopicAndPartition) = {
+ if (!offsets.contains(tp)) {
+ offsets.put(tp, newCounter("%s-%s-offset-change" format (tp.topic, tp.partition)))
+ bytesRead.put(tp, newCounter("%s-%s-bytes-read" format (tp.topic, tp.partition)))
+ reads.put(tp, newCounter("%s-%s-reads" format (tp.topic, tp.partition)))
+ lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format (tp.topic, tp.partition), 0L))
+ }
+ }
+
+ def registerBrokerProxy(host: String, port: Int) {
+ reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, port)))
+ brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format (host, port)))
+ brokerReads.put((host, port), newCounter("%s-%s-reads" format (host, port)))
+ topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format (host, port), 0))
+ }
+
+ override def getPrefix = systemName + "-"
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index fe96dd8..ba08af8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -41,6 +41,7 @@ import org.apache.samza.system.SystemFactory
class KafkaSystemFactory extends SystemFactory {
def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
val clientId = KafkaUtil.getClientId("samza-consumer", config)
+ val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
// Kind of goofy to need a producer config for consumers, but we need metadata.
val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
@@ -67,7 +68,7 @@ class KafkaSystemFactory extends SystemFactory {
new KafkaSystemConsumer(
systemName = systemName,
brokerListString = brokerListString,
- metricsRegistry = registry,
+ metrics = metrics,
clientId = clientId,
timeout = timeout,
bufferSize = bufferSize,
@@ -83,6 +84,7 @@ class KafkaSystemFactory extends SystemFactory {
val reconnectIntervalMs = Option(producerConfig.retryBackoffMs)
.getOrElse(10000)
val getProducer = () => { new Producer[Object, Object](producerConfig) }
+ val metrics = new KafkaSystemProducerMetrics(systemName, registry)
// Unlike consumer, no need to use encoders here, since they come for free
// inside the producer configs. Kafka's producer will handle all of this
@@ -92,8 +94,8 @@ class KafkaSystemFactory extends SystemFactory {
systemName,
batchSize,
reconnectIntervalMs,
- registry,
- getProducer)
+ getProducer,
+ metrics)
}
def getAdmin(systemName: String, config: Config) = {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index e35227f..a419783 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -27,23 +27,17 @@ import kafka.producer.KeyedMessage
import kafka.producer.Producer
import kafka.producer.ProducerConfig
import org.apache.samza.config.Config
-import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.util.KafkaUtil
import org.apache.samza.system.SystemProducer
import org.apache.samza.system.OutgoingMessageEnvelope
-object KafkaSystemProducerMetrics {
- val metricsGroup = "samza.kafka.producer"
-}
-
class KafkaSystemProducer(
systemName: String,
batchSize: Int,
reconnectIntervalMs: Long,
- registry: MetricsRegistry,
- getProducer: () => Producer[Object, Object]) extends SystemProducer with Logging {
+ getProducer: () => Producer[Object, Object],
+ metrics: KafkaSystemProducerMetrics) extends SystemProducer with Logging {
- val flushReconnectCounter = registry.newCounter(KafkaSystemProducerMetrics.metricsGroup, "Producer-%s-Reconnects" format systemName)
var sourceBuffers = Map[String, ArrayBuffer[KeyedMessage[Object, Object]]]()
var producer: Producer[Object, Object] = null
@@ -58,9 +52,15 @@ class KafkaSystemProducer(
def register(source: String) {
sourceBuffers += source -> ArrayBuffer()
+
+ metrics.setBufferSize(source, () => sourceBuffers(source).size)
}
def send(source: String, envelope: OutgoingMessageEnvelope) {
+ debug("Enqueueing message: %s, %s." format (source, envelope))
+
+ metrics.sends.inc
+
sourceBuffers(source) += new KeyedMessage[Object, Object](
envelope.getSystemStream.getStream,
envelope.getKey,
@@ -78,6 +78,8 @@ class KafkaSystemProducer(
debug("Flushing buffer with size: %s." format buffer.size)
+ metrics.flushes.inc
+
while (!done) {
try {
if (producer == null) {
@@ -88,12 +90,13 @@ class KafkaSystemProducer(
producer.send(buffer: _*)
done = true
+ metrics.flushSizes.inc(buffer.size)
} catch {
case e: Throwable =>
warn("Triggering a reconnect for %s because connection failed: %s" format (systemName, e.getMessage))
debug("Exception while producing to %s." format systemName, e)
- flushReconnectCounter.inc
+ metrics.reconnects.inc
if (producer != null) {
producer.close
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
new file mode 100644
index 0000000..ad39157
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.system.SystemStream
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.Partition
+import org.apache.samza.metrics.Gauge
+import org.apache.samza.metrics.MetricsRegistry
+
+class KafkaSystemProducerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+ val reconnects = newCounter("producer-reconnects")
+ val sends = newCounter("producer-sends")
+ val flushes = newCounter("flushes")
+ val flushSizes = newCounter("flush-sizes")
+
+ def setBufferSize(source: String, getValue: () => Int) {
+ newGauge("%s-producer-buffer-size" format source, getValue)
+ }
+
+ override def getPrefix = systemName + "-"
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala
deleted file mode 100644
index 75fc022..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.samza.system.kafka
-
-import org.apache.samza.metrics.{MetricsRegistry, Counter}
-import kafka.common.TopicAndPartition
-import java.util.concurrent.ConcurrentHashMap
-import grizzled.slf4j.Logging
-
-/**
- * Wrapper around the metrics that BrokerProxies will be updating per topic partition. Multiple BrokerProxies will
- * be updating the map at the same time, but no two BrokerProxies should be updating the same key at the same time.
- *
- * @param metricsRegistry Registry to hook counters into.
- */
-private[kafka] class TopicAndPartitionMetrics(metricsRegistry:MetricsRegistry) extends Logging {
- val metricsGroup = "KafkaSystem"
-
- val counters = new ConcurrentHashMap[TopicAndPartition, (Counter,Counter,Counter)]()
-
- def addNewTopicAndPartition(tp:TopicAndPartition) = {
- if(containsTopicAndPartition(tp)) {
- warn("TopicAndPartitionsMetrics already has an entry for topic-partition %s, not adding." format tp)
- } else {
- counters.put(tp, (newCounter("%s-OffsetChange" format tp), newCounter("%s-BytesRead" format tp), newCounter("%s-Reads" format tp)))
- }
- }
-
- def containsTopicAndPartition(tp:TopicAndPartition) = counters.containsKey(tp)
-
- def newCounter = metricsRegistry.newCounter(metricsGroup, _:String)
-
- def getOffsetCounter(tp:TopicAndPartition) = counters.get(tp)._1
-
- def getBytesReadCounter(tp:TopicAndPartition) = counters.get(tp)._2
-
- def getReadsCounter(tp:TopicAndPartition) = counters.get(tp)._3
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 947f5a7..85f5887 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -54,31 +54,24 @@ class TestBrokerProxy {
def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2)
}
+ val system = "daSystem"
val config = new MapConfig(Map[String, String]("job.name" -> "Jobby McJob",
- "systems.daSystem.Redbird.consumer.auto.offset.reset" -> "largest"))
- val metricsRegistry = {
- val registry = Mockito.mock(classOf[MetricsRegistry])
- val gauge = Mockito.mock(classOf[Gauge[Long]])
- when(gauge.getValue()).thenReturn(0l)
- when(registry.newGauge[Long](anyString(), anyString(), anyLong())).thenReturn(gauge)
-
- val counter = Mockito.mock(classOf[Counter])
- when(registry.newCounter(anyString(), anyString())).thenReturn(counter)
- registry
- }
-
+ "systems.%s.Redbird.consumer.auto.offset.reset".format(system) -> "largest"))
+ val host = "host"
+ val port = 2222
val tp = new TopicAndPartition("Redbird", 2012)
- val tpMetrics = new TopicAndPartitionMetrics(metricsRegistry)
+ val metrics = new KafkaSystemConsumerMetrics(system)
- tpMetrics.addNewTopicAndPartition(tp)
+ metrics.registerBrokerProxy(host, port)
+ metrics.registerTopicAndPartition(tp)
+ metrics.topicPartitions(host, port).set(1)
val bp = new BrokerProxy(
- "host",
- 2222,
- "daSystem",
+ host,
+ port,
+ system,
"daClientId",
- metricsRegistry,
- tpMetrics,
+ metrics,
offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) {
val messageSink: MessageSink = sink
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index a21e3bf..cd0942a 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -51,13 +51,13 @@ class TestKafkaSystemProducer {
val props = getProps
@volatile var msgsSent = new CountDownLatch(1)
- val producer = new KafkaSystemProducer("test", 1, 100, new MetricsRegistryMap, () => {
+ val producer = new KafkaSystemProducer("test", 1, 100, () => {
new Producer[Object, Object](new ProducerConfig(props)) {
override def send(messages: KeyedMessage[Object, Object]*) {
msgsSent.countDown
}
}
- })
+ }, new KafkaSystemProducerMetrics)
producer.register("test")
producer.start
@@ -72,13 +72,13 @@ class TestKafkaSystemProducer {
val props = getProps
@volatile var msgsSent = 0
- val producer = new KafkaSystemProducer("test", 2, 100, new MetricsRegistryMap, () => {
+ val producer = new KafkaSystemProducer("test", 2, 100, () => {
new Producer[Object, Object](new ProducerConfig(props)) {
override def send(messages: KeyedMessage[Object, Object]*) {
msgsSent += 1
}
}
- })
+ }, new KafkaSystemProducerMetrics)
// second message should trigger the count down
producer.register("test")
@@ -97,13 +97,13 @@ class TestKafkaSystemProducer {
val msg1 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "a")
val msg2 = new OutgoingMessageEnvelope(new SystemStream("test", "test"), "b")
- val producer = new KafkaSystemProducer("test", 3, 100, new MetricsRegistryMap, () => {
+ val producer = new KafkaSystemProducer("test", 3, 100, () => {
new Producer[Object, Object](new ProducerConfig(props)) {
override def send(messages: KeyedMessage[Object, Object]*) {
msgs ++= messages.map(_.message.asInstanceOf[String])
}
}
- })
+ }, new KafkaSystemProducerMetrics)
// flush should trigger the count down
producer.register("test")
@@ -128,7 +128,7 @@ class TestKafkaSystemProducer {
props.put("producer.type", "sync")
var failCount = 0
- val producer = new KafkaSystemProducer("test", 1, 100, new MetricsRegistryMap, () => {
+ val producer = new KafkaSystemProducer("test", 1, 100, () => {
failCount += 1
if (failCount <= 5) {
throw new RuntimeException("Pretend to fail in factory")
@@ -144,7 +144,7 @@ class TestKafkaSystemProducer {
}
}
}
- })
+ }, new KafkaSystemProducerMetrics)
producer.register("test")
producer.start
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index bd479b6..6685f85 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -42,9 +42,11 @@ import grizzled.slf4j.Logging
* @param cacheEntries The number of entries to hold in the in memory-cache
* @param writeBatchSize The number of entries to batch together before forcing a write
*/
-class CachedStore[K, V](val store: KeyValueStore[K, V],
+class CachedStore[K, V](
+ val store: KeyValueStore[K, V],
val cacheSize: Int,
- val writeBatchSize: Int) extends KeyValueStore[K, V] with Logging {
+ val writeBatchSize: Int,
+ val metrics: CachedStoreMetrics = new CachedStoreMetrics) extends KeyValueStore[K, V] with Logging {
/** the number of items in the dirty list */
private var dirtyCount = 0
@@ -66,9 +68,15 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
}
}
+ metrics.setDirtyCount(() => dirtyCount)
+ metrics.setCacheSize(() => cache.size)
+
def get(key: K) = {
+ metrics.gets.inc
+
val c = cache.get(key)
if (c != null) {
+ metrics.cacheHits.inc
c.value
} else {
val v = store.get(key)
@@ -78,16 +86,20 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
}
def range(from: K, to: K) = {
+ metrics.ranges.inc
flush()
store.range(from, to)
}
def all() = {
+ metrics.alls.inc
flush()
store.all()
}
def put(key: K, value: V) {
+ metrics.puts.inc
+
// add the key to the front of the dirty list (and remove any prior occurrences to dedupe)
val found = cache.get(key)
if (found == null || found.dirty == null) {
@@ -95,7 +107,7 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
} else {
// If we are removing the head of the list, move the head to the next
// element. See SAMZA-45 for details.
- if(found.dirty.prev == null) {
+ if (found.dirty.prev == null) {
this.dirty = found.dirty.next
this.dirty.prev = null
} else {
@@ -123,6 +135,8 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
def flush() {
trace("Flushing.")
+ metrics.flushes.inc
+
// write out the contents of the dirty list oldest first
val batch = new java.util.ArrayList[Entry[K, V]](this.dirtyCount)
for (k <- this.dirty.reverse) {
@@ -132,6 +146,7 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
}
store.putAll(batch)
store.flush
+ metrics.flushBatchSize.inc(batch.size)
// reset the dirty list
this.dirty = new mutable.DoubleLinkedList[K]()
@@ -153,6 +168,8 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
* Perform the local delete and log it out to the changelog
*/
def delete(key: K) {
+ metrics.deletes.inc
+
put(key, null.asInstanceOf[V])
}
@@ -160,10 +177,9 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
trace("Closing.")
flush
-
+
store.close
}
-
}
private case class CacheEntry[K, V](var value: V, var dirty: mutable.DoubleLinkedList[K])
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
new file mode 100644
index 0000000..8ffcfb1
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStoreMetrics.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.MetricsHelper
+
+class CachedStoreMetrics(
+ val storeName: String = "unknown",
+ val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+
+ val gets = newCounter("gets")
+ val ranges = newCounter("ranges")
+ val alls = newCounter("alls")
+ val cacheHits = newCounter("cache-hits")
+ val puts = newCounter("puts")
+ val deletes = newCounter("deletes")
+ val flushes = newCounter("flushes")
+ val flushBatchSize = newCounter("flush-batch-size")
+
+ def setDirtyCount(getValue: () => Int) {
+ newGauge("dirty-count", getValue)
+ }
+
+ def setCacheSize(getValue: () => Int) {
+ newGauge("cache-size", getValue)
+ }
+
+ override def getPrefix = storeName + "-"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 92eee38..fc22383 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -37,17 +37,41 @@ import org.apache.samza.storage.StorageEngine
class KeyValueStorageEngine[K, V](
db: KeyValueStore[K, V],
rawDb: KeyValueStore[Array[Byte], Array[Byte]],
+ metrics: KeyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics,
batchSize: Int = 500) extends StorageEngine with KeyValueStore[K, V] with Logging {
var count = 0
/* delegate to underlying store */
- def get(key: K): V = db.get(key)
- def put(key: K, value: V) = db.put(key, value)
- def putAll(entries: java.util.List[Entry[K, V]]) = db.putAll(entries)
- def delete(key: K) = db.delete(key)
- def range(from: K, to: K) = db.range(from, to)
- def all() = db.all()
+ def get(key: K): V = {
+ metrics.gets.inc
+ db.get(key)
+ }
+
+ def put(key: K, value: V) = {
+ metrics.puts.inc
+ db.put(key, value)
+ }
+
+ def putAll(entries: java.util.List[Entry[K, V]]) = {
+ metrics.puts.inc(entries.size)
+ db.putAll(entries)
+ }
+
+ def delete(key: K) = {
+ metrics.deletes.inc
+ db.delete(key)
+ }
+
+ def range(from: K, to: K) = {
+ metrics.ranges.inc
+ db.range(from, to)
+ }
+
+ def all() = {
+ metrics.alls.inc
+ db.all()
+ }
/**
* Restore the contents of this key/value store from the change log,
@@ -57,13 +81,18 @@ class KeyValueStorageEngine[K, V](
val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize)
for (envelope <- envelopes) {
- batch.add(new Entry(envelope.getKey.asInstanceOf[Array[Byte]], envelope.getMessage.asInstanceOf[Array[Byte]]))
+ val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]]
+ val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]]
+
+ batch.add(new Entry(keyBytes, valBytes))
if (batch.size >= batchSize) {
rawDb.putAll(batch)
batch.clear()
}
+ metrics.restoredBytes.inc(keyBytes.size + valBytes.size)
+ metrics.restoredMessages.inc
count += 1
if (count % 1000000 == 0) {
@@ -79,6 +108,8 @@ class KeyValueStorageEngine[K, V](
def flush() = {
trace("Flushing.")
+ metrics.flushes.inc
+
db.flush
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
index dbdefa0..d42f46e 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
@@ -57,22 +57,27 @@ class KeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] {
throw new SamzaException("Must define a message serde when using key value storage.")
}
- val levelDb = new LevelDbKeyValueStore(storeDir, LevelDbKeyValueStore.options(storageConfig))
+ val levelDbMetrics = new LevelDbKeyValueStoreMetrics(storeName, registry)
+ val levelDb = new LevelDbKeyValueStore(storeDir, LevelDbKeyValueStore.options(storageConfig), levelDbMetrics)
val maybeLoggedStore = if (changeLogSystemStreamPartition == null) {
levelDb
} else {
- new LoggedStore(levelDb, changeLogSystemStreamPartition, collector)
+ val loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry)
+ new LoggedStore(levelDb, changeLogSystemStreamPartition, collector, loggedStoreMetrics)
}
- val serialized = new SerializedKeyValueStore[K, V](maybeLoggedStore, keySerde, msgSerde)
+ val serializedMetrics = new SerializedKeyValueStoreMetrics(storeName, registry)
+ val serialized = new SerializedKeyValueStore[K, V](maybeLoggedStore, keySerde, msgSerde, serializedMetrics)
val maybeCachedStore = if (enableCache) {
- new CachedStore(serialized, cacheSize, batchSize)
+ val cachedStoreMetrics = new CachedStoreMetrics(storeName, registry)
+ new CachedStore(serialized, cacheSize, batchSize, cachedStoreMetrics)
} else {
serialized
}
val db = maybeCachedStore
+ val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry)
// Decide if we should use raw bytes when restoring
- new KeyValueStorageEngine(db, levelDb, batchSize)
+ new KeyValueStorageEngine(db, levelDb, keyValueStorageEngineMetrics, batchSize)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
new file mode 100644
index 0000000..a7958f6
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.MetricsHelper
+
+class KeyValueStorageEngineMetrics(
+ val storeName: String = "unknown",
+ val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+
+ val gets = newCounter("gets")
+ val ranges = newCounter("ranges")
+ val alls = newCounter("alls")
+ val puts = newCounter("puts")
+ val deletes = newCounter("deletes")
+ val flushes = newCounter("flushes")
+
+ val restoredMessages = newCounter("messages-restored")
+ val restoredBytes = newCounter("messages-bytes")
+
+ override def getPrefix = storeName + "-"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
index ac6da25..d37acf9 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
@@ -44,57 +44,76 @@ object LevelDbKeyValueStore {
class LevelDbKeyValueStore(
val dir: File,
- val options: Options) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
+ val options: Options,
+ val metrics: LevelDbKeyValueStoreMetrics = new LevelDbKeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
private lazy val db = factory.open(dir, options)
private val lexicographic = new LexicographicComparator()
def get(key: Array[Byte]): Array[Byte] = {
+ metrics.gets.inc
require(key != null, "Null key not allowed.")
val found = db.get(key)
- if (found == null)
- null
- else
- found
+ if (found != null) {
+ metrics.bytesRead.inc(found.size)
+ }
+ found
}
def put(key: Array[Byte], value: Array[Byte]) {
+ metrics.puts.inc
require(key != null, "Null key not allowed.")
- if (value == null)
+ if (value == null) {
db.delete(key)
- else
+ } else {
+ metrics.bytesWritten.inc(key.size + value.size)
db.put(key, value)
+ }
}
def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
val batch = db.createWriteBatch()
val iter = entries.iterator
+ var wrote = 0
+ var deletes = 0
while (iter.hasNext) {
+ wrote += 1
val curr = iter.next()
- if (curr.getValue == null)
+ if (curr.getValue == null) {
+ deletes += 1
batch.delete(curr.getKey)
- else
- batch.put(curr.getKey, curr.getValue)
+ } else {
+ val key = curr.getKey
+ val value = curr.getValue
+ metrics.bytesWritten.inc(key.size + value.size)
+ batch.put(key, value)
+ }
}
db.write(batch)
+ metrics.puts.inc(wrote)
+ metrics.deletes.inc(deletes)
}
def delete(key: Array[Byte]) {
+ metrics.deletes.inc
put(key, null)
}
def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
+ metrics.ranges.inc
require(from != null && to != null, "Null bound not allowed.")
new LevelDbRangeIterator(db.iterator, from, to)
}
def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+ metrics.alls.inc
val iter = db.iterator()
iter.seekToFirst()
new LevelDbIterator(iter)
}
def flush {
+ metrics.flushes.inc
// TODO can't find a flush for leveldb
trace("Flushing, but flush in LevelDbKeyValueStore doesn't do anything.")
}
@@ -114,8 +133,14 @@ class LevelDbKeyValueStore(
def remove() = iter.remove()
def hasNext() = iter.hasNext()
def next() = {
- val curr = iter.next()
- new Entry(curr.getKey, curr.getValue)
+ val curr = iter.next
+ val key = curr.getKey
+ val value = curr.getValue
+ metrics.bytesRead.inc(key.size)
+ if (value != null) {
+ metrics.bytesRead.inc(value.size)
+ }
+ new Entry(key, value)
}
override def finalize() {
if (open) {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala
new file mode 100644
index 0000000..8949f2f
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStoreMetrics.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.MetricsHelper
+
+class LevelDbKeyValueStoreMetrics(
+ val storeName: String = "unknown",
+ val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+
+ val gets = newCounter("gets")
+ val ranges = newCounter("ranges")
+ val alls = newCounter("alls")
+ val puts = newCounter("puts")
+ val deletes = newCounter("deletes")
+ val flushes = newCounter("flushes")
+ val bytesWritten = newCounter("bytes-written")
+ val bytesRead = newCounter("bytes-read")
+
+ override def getPrefix = storeName + "-"
+}
\ No newline at end of file