You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/06/02 22:16:45 UTC
samza git commit: SAMZA-691: added partition to the messages-chosen
counter in SystemConsumersMetrics
Repository: samza
Updated Branches:
refs/heads/master ddee33dbe -> f77595804
SAMZA-691: added partition to the messages-chosen counter in SystemConsumersMetrics
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f7759580
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f7759580
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f7759580
Branch: refs/heads/master
Commit: f7759580444034c7a030197fcd497d1bd339e792
Parents: ddee33d
Author: József Márton Jung <j....@levi9.com>
Authored: Tue Jun 2 13:16:41 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Tue Jun 2 13:16:41 2015 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/samza/system/SystemConsumers.scala | 4 ++--
.../scala/org/apache/samza/system/SystemConsumersMetrics.scala | 6 +++---
2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f7759580/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 1ec5e32..32fc771 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -173,7 +173,7 @@ class SystemConsumers(
def register(systemStreamPartition: SystemStreamPartition, offset: String) {
debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
- metrics.registerSystemStream(systemStreamPartition.getSystemStream)
+ metrics.registerSystemStreamPartition(systemStreamPartition)
unprocessedMessagesBySSP.put(systemStreamPartition, new ArrayDeque[IncomingMessageEnvelope]())
chooser.register(systemStreamPartition, offset)
@@ -202,7 +202,7 @@ class SystemConsumers(
// Ok to give the chooser a new message from this stream.
timeout = 0
metrics.choseObject.inc
- metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc
+ metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition).inc
tryUpdate(systemStreamPartition)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f7759580/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index a63349c..e7f012f 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -31,7 +31,7 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
val systemPolls = scala.collection.mutable.Map[String, Counter]()
val systemStreamPartitionFetchesPerPoll = scala.collection.mutable.Map[String, Counter]()
val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]()
- val systemStreamMessagesChosen = scala.collection.mutable.Map[SystemStream, Counter]()
+ val systemStreamMessagesChosen = scala.collection.mutable.Map[SystemStreamPartition, Counter]()
def setNeededByChooser(getValue: () => Int) {
newGauge("ssps-needed-by-chooser", getValue)
@@ -53,7 +53,7 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
}
}
- def registerSystemStream(systemStream: SystemStream) {
- systemStreamMessagesChosen += systemStream -> newCounter("%s-%s-messages-chosen" format (systemStream.getSystem, systemStream.getStream))
+ def registerSystemStreamPartition(systemStreamPartition: SystemStreamPartition) {
+ systemStreamMessagesChosen += systemStreamPartition -> newCounter("%s-%s-%d-messages-chosen" format (systemStreamPartition.getSystem, systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId))
}
}
\ No newline at end of file