You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/06/02 22:16:45 UTC

samza git commit: SAMZA-691: added partition to the messages-chosen counter in SystemConsumersMetrics

Repository: samza
Updated Branches:
  refs/heads/master ddee33dbe -> f77595804


SAMZA-691: added partition to the messages-chosen counter in SystemConsumersMetrics


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f7759580
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f7759580
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f7759580

Branch: refs/heads/master
Commit: f7759580444034c7a030197fcd497d1bd339e792
Parents: ddee33d
Author: József Márton Jung <j....@levi9.com>
Authored: Tue Jun 2 13:16:41 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Tue Jun 2 13:16:41 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/system/SystemConsumers.scala   | 4 ++--
 .../scala/org/apache/samza/system/SystemConsumersMetrics.scala | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f7759580/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 1ec5e32..32fc771 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -173,7 +173,7 @@ class SystemConsumers(
 
   def register(systemStreamPartition: SystemStreamPartition, offset: String) {
     debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
-    metrics.registerSystemStream(systemStreamPartition.getSystemStream)
+    metrics.registerSystemStreamPartition(systemStreamPartition)
     unprocessedMessagesBySSP.put(systemStreamPartition, new ArrayDeque[IncomingMessageEnvelope]())
     chooser.register(systemStreamPartition, offset)
 
@@ -202,7 +202,7 @@ class SystemConsumers(
       // Ok to give the chooser a new message from this stream.
       timeout = 0
       metrics.choseObject.inc
-      metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc
+      metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition).inc
 
       tryUpdate(systemStreamPartition)
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/f7759580/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index a63349c..e7f012f 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -31,7 +31,7 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
   val systemPolls = scala.collection.mutable.Map[String, Counter]()
   val systemStreamPartitionFetchesPerPoll = scala.collection.mutable.Map[String, Counter]()
   val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]()
-  val systemStreamMessagesChosen = scala.collection.mutable.Map[SystemStream, Counter]()
+  val systemStreamMessagesChosen = scala.collection.mutable.Map[SystemStreamPartition, Counter]()
 
   def setNeededByChooser(getValue: () => Int) {
     newGauge("ssps-needed-by-chooser", getValue)
@@ -53,7 +53,7 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
     }
   }
 
-  def registerSystemStream(systemStream: SystemStream) {
-    systemStreamMessagesChosen += systemStream -> newCounter("%s-%s-messages-chosen" format (systemStream.getSystem, systemStream.getStream))
+  def registerSystemStreamPartition(systemStreamPartition: SystemStreamPartition) {
+    systemStreamMessagesChosen += systemStreamPartition -> newCounter("%s-%s-%d-messages-chosen" format (systemStreamPartition.getSystem, systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId))
   }
 }
\ No newline at end of file