You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/05/24 02:52:16 UTC

samza git commit: SAMZA-951 - Improve event loop timing metrics

Repository: samza
Updated Branches:
  refs/heads/master 6ae7784a5 -> 9f7abf535


SAMZA-951 - Improve event loop timing metrics


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9f7abf53
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9f7abf53
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9f7abf53

Branch: refs/heads/master
Commit: 9f7abf535822a1de4d6ac6ee73cb3b879800e4a3
Parents: 6ae7784
Author: Jacob Maes <ja...@gmail.com>
Authored: Mon May 23 19:51:17 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon May 23 19:51:17 2016 -0700

----------------------------------------------------------------------
 .../org/apache/samza/container/RunLoop.scala    | 10 +--
 .../apache/samza/system/SystemConsumers.scala   | 73 +++++++++++++-------
 .../samza/system/SystemConsumersMetrics.scala   |  2 +
 .../apache/samza/container/TestRunLoop.scala    |  2 +-
 .../samza/system/TestSystemConsumers.scala      | 12 +++-
 5 files changed, 67 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 6916c5c..3f25eca 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -103,11 +103,13 @@ class RunLoop(
     trace("Attempting to choose a message to process.")
     metrics.processes.inc
 
-    activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => {
-      val envelope = updateTimer(metrics.chooseNs) {
-        consumerMultiplexer.choose
-      }
+    // Exclude choose time from activeNs. Although it includes deserialization time,
+    // it most closely captures idle time.
+    val envelope = updateTimer(metrics.chooseNs) {
+     consumerMultiplexer.choose
+    }
 
+    activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => {
       if (envelope != null) {
         val ssp = envelope.getSystemStreamPartition
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 32fc771..2efe836 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -19,9 +19,12 @@
 
 package org.apache.samza.system
 
+
+import java.util.concurrent.TimeUnit
+
 import scala.collection.JavaConversions._
 import org.apache.samza.serializers.SerdeManager
-import org.apache.samza.util.Logging
+import org.apache.samza.util.{Logging, TimerUtils}
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.SamzaException
 import java.util.HashMap
@@ -44,7 +47,7 @@ object SystemConsumers {
  * messages, poll the MessageChooser for the next message to process, and
  * return that message to the SamzaContainer.
  */
-class SystemConsumers(
+class SystemConsumers (
 
   /**
    * The class that determines the order to process incoming messages.
@@ -59,12 +62,12 @@ class SystemConsumers(
   /**
    * The class that handles deserialization of incoming messages.
    */
-  serdeManager: SerdeManager = new SerdeManager,
+  serdeManager: SerdeManager,
 
   /**
    * A helper class to hold all of SystemConsumers' metrics.
    */
-  metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
+  metrics: SystemConsumersMetrics,
 
   /**
    * If MessageChooser returns null when it's polled, SystemConsumers will
@@ -73,14 +76,14 @@ class SystemConsumers(
    * thread will sit in a tight loop polling every SystemConsumer over and
    * over again if no new messages are available.
    */
-  noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
+  noNewMessagesTimeout: Int,
 
   /**
    * This parameter is to define how to deal with deserialization failure. If
    * set to true, the task will skip the messages when deserialization fails.
    * If set to false, the task will throw SamzaException and fail the container.
    */
-  dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
+  dropDeserializationError: Boolean,
 
   /**
    * <p>Defines an upper bound for how long the SystemConsumers will wait
@@ -96,13 +99,29 @@ class SystemConsumers(
    * with no remaining unprocessed messages, the SystemConsumers will poll for
    * it within 50ms of its availability in the stream system.</p>
    */
-  pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS,
+  pollIntervalMs: Int,
 
   /**
    * Clock can be used to inject a custom clock when mocking this class in
    * tests. The default implementation returns the current system clock time.
    */
-  clock: () => Long = () => System.currentTimeMillis) extends Logging {
+  val clock: () => Long) extends Logging with TimerUtils {
+
+  def this(chooser: MessageChooser,
+           consumers: Map[String, SystemConsumer],
+           serdeManager: SerdeManager = new SerdeManager,
+           metrics: SystemConsumersMetrics = new SystemConsumersMetrics,
+           noNewMessagesTimeout: Int = SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
+           dropDeserializationError: Boolean = SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
+           pollIntervalMs: Int = SystemConsumers.DEFAULT_POLL_INTERVAL_MS) =
+    this(chooser,
+         consumers,
+         serdeManager,
+         metrics,
+         noNewMessagesTimeout,
+         dropDeserializationError,
+         pollIntervalMs,
+         () => System.nanoTime())
 
   /**
    * A buffer of incoming messages grouped by SystemStreamPartition. These
@@ -128,7 +147,7 @@ class SystemConsumers(
   /**
    * The last time that systems were polled for new messages.
    */
-  var lastPollMs = 0L
+  var lastPollNs = 0L
 
   /**
    * Total number of unprocessed messages in unprocessedMessagesBySSP.
@@ -187,28 +206,32 @@ class SystemConsumers(
   def choose: IncomingMessageEnvelope = {
     val envelopeFromChooser = chooser.choose
 
-    if (envelopeFromChooser == null) {
-      trace("Chooser returned null.")
+    updateTimer(metrics.deserializationNs) {
+      if (envelopeFromChooser == null) {
+       trace("Chooser returned null.")
 
-      metrics.choseNull.inc
+       metrics.choseNull.inc
 
-      // Sleep for a while so we don't poll in a tight loop.
-      timeout = noNewMessagesTimeout
-    } else {
-      val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition
+       // Sleep for a while so we don't poll in a tight loop.
+       timeout = noNewMessagesTimeout
+      } else {
+       val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition
 
-      trace("Chooser returned an incoming message envelope: %s" format envelopeFromChooser)
+       trace("Chooser returned an incoming message envelope: %s" format envelopeFromChooser)
 
-      // Ok to give the chooser a new message from this stream.
-      timeout = 0
-      metrics.choseObject.inc
-      metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition).inc
+       // Ok to give the chooser a new message from this stream.
+       timeout = 0
+       metrics.choseObject.inc
+       metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition).inc
 
-      tryUpdate(systemStreamPartition)
+       tryUpdate(systemStreamPartition)
+      }
     }
 
-    if (envelopeFromChooser == null || lastPollMs < clock() - pollIntervalMs) {
-      refresh
+    updateTimer(metrics.pollNs) {
+      if (envelopeFromChooser == null || TimeUnit.NANOSECONDS.toMillis(clock() - lastPollNs) > pollIntervalMs) {
+        refresh
+      }
     }
 
     envelopeFromChooser
@@ -280,7 +303,7 @@ class SystemConsumers(
     trace("Refreshing chooser with new messages.")
 
     // Update last poll time so we don't poll too frequently.
-    lastPollMs = clock()
+    lastPollNs = clock()
 
     // Poll every system for new messages.
     consumers.keys.map(poll(_))

http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index e7f012f..43d381b 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -32,6 +32,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
   val systemStreamPartitionFetchesPerPoll = scala.collection.mutable.Map[String, Counter]()
   val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]()
   val systemStreamMessagesChosen = scala.collection.mutable.Map[SystemStreamPartition, Counter]()
+  val pollNs = newTimer("poll-ns")
+  val deserializationNs = newTimer("deserialization-ns")
 
   def setNeededByChooser(getValue: () => Int) {
     newGauge("ssps-needed-by-chooser", getValue)

http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index ad37447..e280daa 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -217,7 +217,7 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
 
     testMetrics.chooseNs.getSnapshot.getAverage should equal(1000000L)
     testMetrics.windowNs.getSnapshot.getAverage should equal(1000000L)
-    testMetrics.processNs.getSnapshot.getAverage should equal(3000000L)
+    testMetrics.processNs.getSnapshot.getAverage should equal(1000000L)
     testMetrics.commitNs.getSnapshot.getAverage should equal(1000000L)
 
     now = 0L

http://git-wip-us.apache.org/repos/asf/samza/blob/9f7abf53/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index fbaa8ee..09da62e 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -39,7 +39,11 @@ class TestSystemConsumers {
     val envelope = new IncomingMessageEnvelope(systemStreamPartition0, "1", "k", "v")
     val consumer = new CustomPollResponseSystemConsumer(envelope)
     var now = 0L
-    val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), clock = () => now)
+    val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
+                                        new SerdeManager, new SystemConsumersMetrics,
+                                        SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
+                                        SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
+                                        SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
 
     consumers.register(systemStreamPartition0, "0")
     consumers.register(systemStreamPartition1, "1234")
@@ -97,7 +101,11 @@ class TestSystemConsumers {
     val envelope = new IncomingMessageEnvelope(systemStreamPartition, "1", "k", "v")
     val consumer = new CustomPollResponseSystemConsumer(envelope)
     var now = 0
-    val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), clock = () => now)
+    val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
+                                        new SerdeManager, new SystemConsumersMetrics,
+                                        SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
+                                        SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
+                                        SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
 
     consumers.register(systemStreamPartition, "0")
     consumers.start